New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce device id v2 #362
Merged
+332
−35
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
4e5ed88
uuid init
darkdh 9e96726
deviceIdV2 and SQS queue migration
darkdh 77af179
Emit SAVE_INIT_DATA for existing devices to update deviceIdV2
darkdh 0387b4e
fix legacy device id reset issue
darkdh f6a25c1
Add device id v2 test
darkdh File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.
| @@ -2,6 +2,7 @@ | ||
|
|
||
| const awsSdk = require('aws-sdk') | ||
| const cryptoUtil = require('./cryptoUtil') | ||
| const deepEqual = require('deep-equal') | ||
| const recordUtil = require('./recordUtil') | ||
| const proto = require('./constants/proto') | ||
| const {limitConcurrency} = require('../lib/promiseHelper') | ||
| @@ -30,6 +31,26 @@ const isExpiredCredentialError = (error) => { | ||
| }) | ||
| } | ||
|
|
||
| const createAndSubscribeSQSforCategory = function (deviceId, category, thisRef) { | ||
| let newQueueParams = { | ||
| QueueName: thisRef.sqsName(deviceId, category), | ||
| Attributes: { | ||
| 'MessageRetentionPeriod': s3Helper.SQS_RETENTION | ||
| } | ||
| } | ||
| return new Promise((resolve, reject) => { | ||
| thisRef.sqs.createQueue(newQueueParams, (error, data) => { | ||
| if (error) { | ||
| console.log('SQS creation failed with error: ' + error) | ||
| reject(error) | ||
| } else if (data) { | ||
| thisRef.SQSUrlByCat[category] = data.QueueUrl | ||
| resolve([]) | ||
| } | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
| * @param {{ | ||
| * apiVersion: <string>, | ||
| @@ -64,6 +85,7 @@ const RequestUtil = function (opts = {}) { | ||
| this.saveAWSCredentials(credentials) | ||
| } | ||
| this.SQSUrlByCat = [] | ||
| this.oldSQSUrlByCat = [] | ||
| this.missingObjectsCache = new LRUCache(50) | ||
| // This is used to keep the most recent records for each object id | ||
| this.latestRecordsCache = new LRUCache(100) | ||
| @@ -249,8 +271,30 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin | ||
| WaitTimeSeconds: CONFIG.SQS_MESSAGES_LONGPOLL_TIMEOUT | ||
| } | ||
|
|
||
| return s3Helper.listNotifications(this.sqs, notificationParams, category, | ||
| prefix) | ||
| // We will fetch from both old and new SQS queues until old one gets retired | ||
| if (this.oldSQSUrlByCat[category]) { | ||
| let oldNotificationParams = Object.assign({}, notificationParams) | ||
| oldNotificationParams.QueueUrl = `${this.oldSQSUrlByCat[category]}` | ||
|
|
||
| return s3Helper.listNotifications( | ||
| this.sqs, notificationParams, category, prefix).then((values) => { | ||
| if (this.shouldRetireOldSQSQueue(parseInt(values.createdTimeStamp))) { | ||
| return this.deleteSQSQueue(this.oldSQSUrlByCat[category]).then(() => { | ||
AlexeyBarabash
Contributor
|
||
| delete this.oldSQSUrlByCat[category] | ||
| return values | ||
| }) | ||
| } | ||
| return s3Helper.listNotifications( | ||
| this.sqs, oldNotificationParams, category, prefix).then((oldValues) => { | ||
| if (deepEqual(values.contents, oldValues.contents, {strict: true})) { | ||
| return values | ||
| } | ||
| return {contents: values.contents.concat(oldValues.contents), | ||
| createdTimeStamp: values.createdTimeStamp} | ||
| }) | ||
| }) | ||
| } | ||
| return s3Helper.listNotifications(this.sqs, notificationParams, category, prefix) | ||
| }) | ||
| } | ||
|
|
||
| @@ -275,6 +319,21 @@ RequestUtil.prototype.shouldListObject = function (startAt, category) { | ||
| this.listInProgress === true | ||
| } | ||
|
|
||
| /** | ||
| * The retention time of our SQS queue is 24 hours so we will retire old SQS | ||
| * queue created by device id after 24 hours | ||
| * @param {number=} createdTimestamp is the when new device id v2 queue was | ||
| * created | ||
| * @returns {boolean} | ||
| */ | ||
| RequestUtil.prototype.shouldRetireOldSQSQueue = function (createdTimestamp) { | ||
| let currentTime = new Date().getTime() | ||
| let newQueueCreatedTime = | ||
| this.normalizeTimestampToMs(createdTimestamp, currentTime) | ||
|
|
||
| return (currentTime - newQueueCreatedTime) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 | ||
| } | ||
|
|
||
| /** | ||
| * Checks do we need to use s3 list Object or SQS notifications | ||
| * @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds | ||
| @@ -339,40 +398,57 @@ RequestUtil.prototype.sqsName = function (deviceId, category) { | ||
| return queueName | ||
| } | ||
|
|
||
| /** | ||
| * Main purpose of this function is to create old SQS queue to test device id v2 | ||
| * migration | ||
| * @param {string} deviceId | ||
| * @returns {Promise} | ||
| */ | ||
| RequestUtil.prototype.createAndSubscribeSQSForTest = function (deviceId) { | ||
| var createSQSPromises = [] | ||
| // Simple for loop instead foreach to capture 'this' | ||
| for (var i = 0; i < CATEGORIES_FOR_SQS.length; ++i) { | ||
| createSQSPromises.push(createAndSubscribeSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| } | ||
|
|
||
| return Promise.all(createSQSPromises) | ||
| } | ||
|
|
||
| /** | ||
| * Creates SQS for the current device. | ||
| * @param {string} deviceId | ||
| * @param {string} deviceIdV2 | ||
| * @returns {Promise} | ||
| */ | ||
| RequestUtil.prototype.createAndSubscribeSQS = function (deviceId) { | ||
| RequestUtil.prototype.createAndSubscribeSQS = function (deviceId, deviceIdV2) { | ||
| // Creating a query for the current userId | ||
| if (!deviceId) { | ||
| throw new Error('createSQS failed. deviceId is null!') | ||
| if (!deviceIdV2) { | ||
| throw new Error('createSQS failed. deviceIdV2 is null!') | ||
| } | ||
| this.deviceId = deviceId | ||
| const createAndSubscribeSQSforCategory = function (deviceId, category, thisRef) { | ||
| let newQueueParams = { | ||
| QueueName: thisRef.sqsName(deviceId, category), | ||
| Attributes: { | ||
| 'MessageRetentionPeriod': s3Helper.SQS_RETENTION | ||
| } | ||
| } | ||
| this.deviceIdV2 = deviceIdV2 | ||
|
|
||
| const subscribeOldSQSforCategory = function (deviceId, category, thisRef) { | ||
| return new Promise((resolve, reject) => { | ||
| thisRef.sqs.createQueue(newQueueParams, (error, data) => { | ||
| let params = { | ||
| QueueName: thisRef.sqsName(deviceId, category) | ||
| } | ||
| thisRef.sqs.getQueueUrl(params, (error, data) => { | ||
| if (error) { | ||
| console.log('SQS creation failed with error: ' + error) | ||
| reject(error) | ||
| // queue doesn't exist | ||
| resolve() | ||
| } else if (data) { | ||
| thisRef.SQSUrlByCat[category] = data.QueueUrl | ||
| resolve([]) | ||
| thisRef.oldSQSUrlByCat[category] = data.QueueUrl | ||
| resolve() | ||
| } | ||
| }) | ||
| }) | ||
| } | ||
| var createSQSPromises = [] | ||
| // Simple for loop instead foreach to capture 'this' | ||
| for (var i = 0; i < CATEGORIES_FOR_SQS.length; ++i) { | ||
| createSQSPromises.push(createAndSubscribeSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| // Doesn't have to create about to deprecated queues | ||
| createSQSPromises.push(subscribeOldSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| createSQSPromises.push(createAndSubscribeSQSforCategory(deviceIdV2, CATEGORIES_FOR_SQS[i], this)) | ||
| } | ||
|
|
||
| return Promise.all(createSQSPromises) | ||
| @@ -530,7 +606,7 @@ RequestUtil.prototype.deleteUser = function () { | ||
| RequestUtil.prototype.purgeUserCategoryQueue = function (category) { | ||
| return new Promise((resolve, reject) => { | ||
| let params = { | ||
| QueueName: this.sqsName(this.deviceId, category) | ||
| QueueName: this.sqsName(this.deviceIdV2, category) | ||
| } | ||
| this.sqs.getQueueUrl(params, (error, data) => { | ||
| if (error) { | ||
| @@ -553,6 +629,24 @@ RequestUtil.prototype.purgeUserCategoryQueue = function (category) { | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
| * Delete SQS queue by url | ||
| * @param {string} url - SQS queue url | ||
| */ | ||
| RequestUtil.prototype.deleteSQSQueue = function (url) { | ||
| return new Promise((resolve, reject) => { | ||
| let params = { | ||
| QueueUrl: url | ||
| } | ||
| this.sqs.deleteQueue(params, (err, data) => { | ||
| if (err) { | ||
| console.log('SQS deleteQueue failed with error: ' + err) | ||
| } | ||
| resolve([]) | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| RequestUtil.prototype.purgeUserQueue = function () { | ||
| var purgeQueuePromises = [] | ||
| for (var i = 0; i < CATEGORIES_FOR_SQS.length; ++i) { | ||
Oops, something went wrong.
ProTip!
Use n and p to navigate between commits in a pull request.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
@AlexeyBarabash reminded me that when device id is duplicated, if there is an old Brave which doesn't contain the fix, its SQS queue will be unavailable until next relaunch because upgraded Brave deletes it after 24 hours