Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach operations back to links #3

Merged
merged 4 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const checkRateLimit = require('./helpers').checkRateLimit;
const githubPullRequestsCreate = github => github.pullRequests.create

const ONE_HOUR_IN_SECONDS = 60 * 60;
const LINK_OPERATION_EXPIRY_TIME_IN_SECONDS = 24 * ONE_HOUR_IN_SECONDS;
const debug = require('debug')('backstroke:webhook-status-store');
const WebhookStatusStore = {
set(webhookId, status, expiresIn=24*ONE_HOUR_IN_SECONDS) {
set(webhookId, status, expiresIn=LINK_OPERATION_EXPIRY_TIME_IN_SECONDS) {
return new Promise((resolve, reject) => {
redis.set(`webhook:status:${webhookId}`, JSON.stringify(status), 'EX', expiresIn, (err, id) => {
if (err) {
Expand Down Expand Up @@ -60,6 +61,44 @@ const WebhookStatusStore = {
});
});
},
attachToLink(linkId, webhookId) {
return new Promise((resolve, reject) => {
// Get unix epoch timestamp in seconds.
// FIXME: should use redis time. We're not accounting for any sort of server time drift here.
const timestamp = Math.floor(new Date().getTime() / 1000);

// Step 1: Remove all link operations that have expired.
// This is a workaround because items within a set can't have a TTL, see:
// https://stackoverflow.com/a/37184581/4115328
redis.zremrangebyscore(
`webhook:operations:${linkId}`,
// From the start of time (1970) to 24h ago
0, timestamp - LINK_OPERATION_EXPIRY_TIME_IN_SECONDS,
err => {
if (err) {
reject(err);
} else {
// Step 2: Add our new link operation to the set associated with the link, giving it a
// score of the timestamp.
redis.zadd(`webhook:operations:${linkId}`, timestamp, webhookId, (err, data) => {
if (err) {
reject(err);
} else {
// Step 3: Set an expire time on the set associated with the link.
redis.expire(`webhook:operations:${linkId}`, LINK_OPERATION_EXPIRY_TIME_IN_SECONDS, err => {
if (err) {
reject(err);
} else {
// Resolves the cached data.
resolve(JSON.parse(data));
}
});
}
});
}
});
});
},
};

const WebhookQueue = {
Expand Down Expand Up @@ -111,7 +150,8 @@ WebhookQueue.initialize();

// Logging function to use in webhooks.
function logger() {
console.log.apply(console, [' *', ...arguments]);
const timestamp = (new Date()).toUTCString();
console.log.apply(console, [`* [${timestamp}] `, ...arguments]);
}


Expand Down
5 changes: 4 additions & 1 deletion src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ module.exports = async function processBatch(
const link = webhook.data.link;
const user = webhook.data.user;

// Inform redis of the associatio between the operation and the link.
await WebhookStatusStore.attachToLink(link.id, webhook.id);

// Log the type of update that is happening.
process.env.NODE_ENV !== 'test' && console.log(`=> * Handling webhook ${webhook.id}:`);
process.env.NODE_ENV !== 'test' && console.log(`* [${(new Date()).toUTCString()}] => Handling webhook ${webhook.id}:`);
debug(`From: ${link.upstreamOwner}/${link.upstreamRepo}@${link.upstreamBranch}`);
if (link.forkType === 'fork-all') {
debug(`To: all forks @ ${link.upstreamBranch} (uses upstream branch)`);
Expand Down
28 changes: 28 additions & 0 deletions src/worker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const MockWebhookQueue = {
reset() {
this.queue = [];
},

push(data) {
const id = (new Date()).getTime();
this.queue.push({id, data});
Expand All @@ -21,6 +22,12 @@ const MockWebhookQueue = {

const MockWebhookStatusStore = {
keys: {},
links: {},
reset() {
this.keys = {};
this.links = {};
},

set(webhookId, status) {
const id = (new Date()).getTime();
this.keys[webhookId] = {status, id};
Expand All @@ -29,9 +36,22 @@ const MockWebhookStatusStore = {
get(webhookId) {
return Promise.resolve(this.keys[webhookId].status);
},
attachToLink(linkId, webhookId) {
this.links[linkId] = [
...(this.links[linkId] || []),
webhookId,
];
return Promise.resolve();
}
};

describe('webhook worker', () => {
// Before each test, reset both mocks.
beforeEach(() => {
MockWebhookQueue.reset();
MockWebhookStatusStore.reset();
});

it('should create a pull request when given a single fork', async () => {
const createPullRequest = require('./helpers').createPullRequest;
const getForksForRepo = sinon.stub().resolves([{
Expand Down Expand Up @@ -108,6 +128,10 @@ describe('webhook worker', () => {
forkCount: 1,
response: 'Successfully created pull request on rgaus/backstroke',
});

// Make sure that the operation was properly attached to the link
// 8 = link id, enqueuedAs = link operation id
assert.deepEqual(MockWebhookStatusStore.links[8], [enqueuedAs]);
});
it('should create a pull request on each fork when given a bunch of forks', async () => {
const createPullRequest = require('./helpers').createPullRequest;
Expand Down Expand Up @@ -190,6 +214,10 @@ describe('webhook worker', () => {

// Should have created two pull requests.
assert.equal(pullRequestMock.callCount, 2);

// Make sure that the operation was properly attached to the link
// 8 = link id, enqueuedAs = link operation id
assert.deepEqual(MockWebhookStatusStore.links[8], [enqueuedAs]);
});

it('should try to make a PR to a single fork of an upstream, but the repo opted out', async () => {
Expand Down