New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce device id v2 #362
Merged
+332
−35
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
4e5ed88
uuid init
darkdh 9e96726
deviceIdV2 and SQS queue migration
darkdh 77af179
Emit SAVE_INIT_DATA for existing devices to update deviceIdV2
darkdh 0387b4e
fix legacy device id reset issue
darkdh f6a25c1
Add device id v2 test
darkdh File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.
deviceIdV2 and SQS queue migration
- Loading branch information
darkdh
committed
Nov 26, 2019
darkdh
Anthony Tseng
commit 9e96726e869b8d8e7b35d27e36c43055527cb9fe
Verified
This commit was signed with a verified signature.
GPG key ID: E14094A20D3AAEEC
Learn about signing commits
| @@ -2,6 +2,7 @@ | ||
|
|
||
| const awsSdk = require('aws-sdk') | ||
| const cryptoUtil = require('./cryptoUtil') | ||
| const deepEqual = require('deep-equal') | ||
| const recordUtil = require('./recordUtil') | ||
| const proto = require('./constants/proto') | ||
| const {limitConcurrency} = require('../lib/promiseHelper') | ||
| @@ -64,6 +65,7 @@ const RequestUtil = function (opts = {}) { | ||
| this.saveAWSCredentials(credentials) | ||
| } | ||
| this.SQSUrlByCat = [] | ||
| this.oldSQSUrlByCat = [] | ||
| this.missingObjectsCache = new LRUCache(50) | ||
| // This is used to keep the most recent records for each object id | ||
| this.latestRecordsCache = new LRUCache(100) | ||
| @@ -249,8 +251,27 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin | ||
| WaitTimeSeconds: CONFIG.SQS_MESSAGES_LONGPOLL_TIMEOUT | ||
| } | ||
|
|
||
| return s3Helper.listNotifications(this.sqs, notificationParams, category, | ||
| prefix) | ||
| if (this.oldSQSUrlByCat[category]) { | ||
| let oldNotificationParams = Object.assign({}, notificationParams) | ||
| oldNotificationParams.QueueUrl = `${this.oldSQSUrlByCat[category]}` | ||
|
|
||
| return s3Helper.listNotifications(this.sqs, notificationParams, category, prefix).then((values) => { | ||
| if (this.shouldRetireOldSQSQueue(parseInt(values.createdTimeStamp))) { | ||
| return this.deleteSQSQueue(this.oldSQSUrlByCat[category]).then(() => { | ||
AlexeyBarabash
Contributor
|
||
| delete this.oldSQSUrlByCat[category] | ||
| return values | ||
| }) | ||
| } | ||
| return s3Helper.listNotifications(this.sqs, oldNotificationParams, category, prefix).then((oldValues) => { | ||
| if (deepEqual(values.contents, oldValues.contents, {strict: true})) { | ||
| return values | ||
| } | ||
| return {contents: values.contents.concat(oldValues.contents), | ||
| createdTimeStamp: values.createdTimeStamp} | ||
| }) | ||
| }) | ||
| } | ||
| return s3Helper.listNotifications(this.sqs, notificationParams, category, prefix) | ||
| }) | ||
| } | ||
|
|
||
| @@ -275,6 +296,14 @@ RequestUtil.prototype.shouldListObject = function (startAt, category) { | ||
| this.listInProgress === true | ||
| } | ||
|
|
||
| RequestUtil.prototype.shouldRetireOldSQSQueue = function (createdTimestamp) { | ||
| let currentTime = new Date().getTime() | ||
| let newQueueCreatedTime = | ||
| this.normalizeTimestampToMs(createdTimestamp, currentTime) | ||
|
|
||
| return (currentTime - newQueueCreatedTime) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 | ||
| } | ||
|
|
||
| /** | ||
| * Checks do we need to use s3 list Object or SQS notifications | ||
| * @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds | ||
| @@ -342,15 +371,16 @@ RequestUtil.prototype.sqsName = function (deviceId, category) { | ||
| /** | ||
| * Creates SQS for the current device. | ||
| * @param {string} deviceId | ||
| * @param {string} deviceUuid | ||
| * @param {string} deviceIdV2 | ||
| * @returns {Promise} | ||
| */ | ||
| RequestUtil.prototype.createAndSubscribeSQS = function (deviceId, deviceUuid) { | ||
| RequestUtil.prototype.createAndSubscribeSQS = function (deviceId, deviceIdV2) { | ||
| // Creating a query for the current userId | ||
| if (!deviceId) { | ||
| throw new Error('createSQS failed. deviceId is null!') | ||
| if (!deviceIdV2) { | ||
| throw new Error('createSQS failed. deviceIdV2 is null!') | ||
| } | ||
| this.deviceId = deviceId | ||
| this.deviceIdV2 = deviceIdV2 | ||
| const createAndSubscribeSQSforCategory = function (deviceId, category, thisRef) { | ||
| let newQueueParams = { | ||
| QueueName: thisRef.sqsName(deviceId, category), | ||
| @@ -370,10 +400,31 @@ RequestUtil.prototype.createAndSubscribeSQS = function (deviceId, deviceUuid) { | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| const subscribeOldSQSforCategory = function (deviceId, category, thisRef) { | ||
| return new Promise((resolve, reject) => { | ||
| let params = { | ||
| QueueName: thisRef.sqsName(deviceId, category) | ||
| } | ||
| thisRef.sqs.getQueueUrl(params, (error, data) => { | ||
| if (error) { | ||
| // queue doesn't exist | ||
| resolve() | ||
| } else if (data) { | ||
| thisRef.oldSQSUrlByCat[category] = data.QueueUrl | ||
| resolve() | ||
| } | ||
| }) | ||
| }) | ||
| } | ||
| var createSQSPromises = [] | ||
| // Simple for loop instead foreach to capture 'this' | ||
| for (var i = 0; i < CATEGORIES_FOR_SQS.length; ++i) { | ||
| createSQSPromises.push(createAndSubscribeSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| // Doesn't have to create about to deprecate quques | ||
| // createSQSPromises.push(createAndSubscribeSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| createSQSPromises.push(subscribeOldSQSforCategory(deviceId, CATEGORIES_FOR_SQS[i], this)) | ||
| createSQSPromises.push(createAndSubscribeSQSforCategory(deviceIdV2, CATEGORIES_FOR_SQS[i], this)) | ||
| // TODO(darkdh): encode into base64 | ||
| } | ||
|
|
||
| return Promise.all(createSQSPromises) | ||
| @@ -531,7 +582,7 @@ RequestUtil.prototype.deleteUser = function () { | ||
| RequestUtil.prototype.purgeUserCategoryQueue = function (category) { | ||
| return new Promise((resolve, reject) => { | ||
| let params = { | ||
| QueueName: this.sqsName(this.deviceId, category) | ||
| QueueName: this.sqsName(this.deviceIdV2, category) | ||
| } | ||
| this.sqs.getQueueUrl(params, (error, data) => { | ||
| if (error) { | ||
| @@ -554,6 +605,20 @@ RequestUtil.prototype.purgeUserCategoryQueue = function (category) { | ||
| }) | ||
| } | ||
|
|
||
| RequestUtil.prototype.deleteSQSQueue = function (url) { | ||
| return new Promise((resolve, reject) => { | ||
| let params = { | ||
| QueueUrl: url | ||
| } | ||
| this.sqs.deleteQueue(params, (err, data) => { | ||
| if (err) { | ||
| console.log('SQS deleteQueue failed with error: ' + err) | ||
| } | ||
| resolve([]) | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| RequestUtil.prototype.purgeUserQueue = function () { | ||
| var purgeQueuePromises = [] | ||
| for (var i = 0; i < CATEGORIES_FOR_SQS.length; ++i) { | ||
ProTip!
Use n and p to navigate between commits in a pull request.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
@AlexeyBarabash reminded me that when device id is duplicated, if there is an old Brave which doesn't contain the fix, its SQS queue will be unavailable until next relaunch because upgraded Brave deletes it after 24 hours