Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #363 from brave/poll_sqs_after24h #366

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Merge pull request #363 from brave/poll_sqs_after24h

previousFetchTime parameter for FETCH_SYNC_RECORDS
  • Loading branch information
AlexeyBarabash committed Jan 10, 2020
commit e1e90e12b633bb9a2c3040b43b7ec535ac6a3b58
@@ -37,7 +37,8 @@ jspm_packages
# Optional REPL history
.node_repl_history

bundles/*.js
# Generated bundles files
bundles/**/*.js

#Editors
.idea
@@ -57,7 +57,7 @@ const messages = {
* with new records, do
* GET_EXISTING_OBJECTS -> RESOLVE_SYNC_RECORDS -> RESOLVED_SYNC_RECORDS
*/
FETCH_SYNC_RECORDS: _, /* @param Array.<string> categoryNames, @param {number} startAt (in seconds or milliseconds), @param {number=} maxRecords limit response to a given max number of records. set to 0 or falsey to not limit the response */
FETCH_SYNC_RECORDS: _, /* @param Array.<string> categoryNames, @param {number} startAt (in seconds or milliseconds), @param {number=} maxRecords limit response to a given max number of records. set to 0 or falsey to not limit the response, @param {number} previousFetchTime (in milliseconds) */
/**
* browser -> webview
* sent to fetch all sync devices. webview responds with RESOLVED_SYNC_RECORDS.
@@ -211,7 +211,7 @@ RequestUtil.prototype.parseAWSResponse = function (bytes) {
* }} opts
* @returns {Promise(Array.<Object>)}
*/
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, opts = {}) {
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, previousFetchTime, opts = {}) {
const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: maxRecords || 1000,
@@ -223,7 +223,7 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
}
if (startAt) { options.StartAfter = `${prefix}/${startAt}` }
return this.withRetry(() => {
if (this.shouldListObject(startAt, category) || opts.compaction) {
if (this.shouldListObject(previousFetchTime, category) || opts.compaction) {
const s3ObjectsPromise = s3Helper.listObjects(this.s3, options, !!maxRecords)
if (!opts.compaction) {
return s3ObjectsPromise
@@ -237,7 +237,7 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
// wait for 15 seconds between batches
setTimeout(() => {
if (s3Objects.isTruncated) {
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, opts)
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, previousFetchTime, opts)
}
return new Promise((resolve, reject) => {
// compaction is done
@@ -305,16 +305,14 @@ const CATEGORIES_FOR_SQS = [proto.categories.BOOKMARKS, proto.categories.PREFERE

/**
* Checks do we need to use s3 list Object or SQS notifications
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds
* @param {number=} previousFetchTime - the previous fetch time. Could be seconds or milliseconds
* @param {string} category - the category ID
* @returns {boolean}
*/
RequestUtil.prototype.shouldListObject = function (startAt, category) {
RequestUtil.prototype.shouldListObject = function (previousFetchTime, category) {
let currentTime = new Date().getTime()
let startAtToCheck = this.normalizeTimestampToMs(startAt, currentTime)

return !startAtToCheck ||
(currentTime - startAtToCheck) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 ||
return !previousFetchTime ||
(currentTime - previousFetchTime) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 ||
!CATEGORIES_FOR_SQS.includes(category) ||
this.listInProgress === true
}
@@ -336,17 +334,20 @@ RequestUtil.prototype.shouldRetireOldSQSQueue = function (createdTimestamp) {

/**
* Checks do we need to use s3 list Object or SQS notifications
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds
* @param {number=} timeToNormalize could be seconds or milliseconds
* @param {number=} currentTime currentTime in milliseconds
* @returns {number=}
* @returns {number=} the time for sure in milliseconds
*/
RequestUtil.prototype.normalizeTimestampToMs = function (startAt, currentTime) {
let startAtToCheck = startAt
if (startAtToCheck && currentTime.toString().length - startAtToCheck.toString().length >= 3) {
startAtToCheck *= 1000
RequestUtil.prototype.normalizeTimestampToMs = function (timeToNormalize, currentTime) {
if (!timeToNormalize) {
return 0
}

if (timeToNormalize && currentTime.toString().length - timeToNormalize.toString().length >= 3) {
timeToNormalize *= 1000
}

return startAtToCheck
return timeToNormalize
}

/**
@@ -109,8 +109,8 @@ const startSync = (requester) => {
return jsRecords
}

ipc.on(messages.FETCH_SYNC_RECORDS, (e, categoryNames, startAt, limitResponse) => {
logSync(`fetching ${categoryNames} records after ${startAt}`)
ipc.on(messages.FETCH_SYNC_RECORDS, (e, categoryNames, startAt, limitResponse, previousFetchTime) => {
logSync(`fetching ${categoryNames} records after ${startAt} previous fetch is ${previousFetchTime}`)
categoryNames.forEach((category) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
@@ -119,7 +119,7 @@ const startSync = (requester) => {
if (nextContinuationTokens[category]) {
continuationToken = nextContinuationTokens[category]
}
requester.list(proto.categories[category], startAt, limitResponse, continuationToken).then((s3Objects) => {
requester.list(proto.categories[category], startAt, limitResponse, continuationToken, previousFetchTime).then((s3Objects) => {
const jsRecords = getJSRecords(s3Objects.contents)
logSync(`got ${jsRecords.length} decrypted records in ${category} after ${startAt}`)
let lastRecordTimestamp
@@ -220,7 +220,7 @@ const startSync = (requester) => {
ipc.send(messages.GET_EXISTING_OBJECTS, category, jsRecords, 0, false)
}
if (!isCompactionInProgress) {
requester.list(proto.categories[category], 0, 1000, '',
requester.list(proto.categories[category], 0, 1000, '', 0,
{compaction: true, compactionDoneCb: compactionDone, compactionUpdateCb: compactionUpdate}).then(() => {
logSync(`Compacting category: ${category}`)
isCompactionInProgress = true
@@ -97,7 +97,7 @@ test('deviceId V2 migration', (t) => {
const testCanListFromOldQueue = (t) => {
t.test('can list notification from old SQS queue', (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 0, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 1)
@@ -127,7 +127,7 @@ test('deviceId V2 migration', (t) => {
const testCanListFromBothQueues = (t) => {
t.test('can list notifications from new and old SQS queues', (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 0, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 1)
@@ -52,7 +52,25 @@ test('client RequestUtil', (t) => {
})
const serializer = requestUtil.serializer

t.plan(2)
t.plan(3)

t.test('#should list object', (t) => {
t.plan(3)

t.equals(true,
requestUtil.shouldListObject(0, proto.categories.BOOKMARKS),
`${t.name}: previous fetch time is empty - use S3`)

t.equals(false,
requestUtil.shouldListObject(new Date().getTime()-1000*60, proto.categories.BOOKMARKS),
`${t.name}: previous fetch is not older than 24h - use SQS`)

const delta25hours = 1000*60*60*25
t.equals(true,
requestUtil.shouldListObject(new Date().getTime()-delta25hours, proto.categories.BOOKMARKS),
`${t.name}: previous fetch is older than 24h - use S3`)
})

t.test('#put preference: device', (t) => {
t.plan(2)
const deviceId = new Uint8Array([0])
@@ -288,7 +306,7 @@ test('client RequestUtil', (t) => {
const testCanListNotifications = (t) => {
t.test(`${t.name} can list notification`, (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 1000, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[1] ? response[1].record : response[1]
@@ -461,7 +479,7 @@ test('client RequestUtil', (t) => {
}
// limit batch size to 10 to test cross batch compaction for around 40
// objects
requestUtil.list(proto.categories.BOOKMARKS, 0, 10, '', {compaction: true,
requestUtil.list(proto.categories.BOOKMARKS, 0, 10, '', 0, {compaction: true,
compactionUpdateCb: compactionUpdate,
compactionDoneCb: () => {
console.log = consoleLogBak
@@ -474,7 +492,7 @@ test('client RequestUtil', (t) => {
compactedRecord.filter(record => record.objectId.toString() === record2_update.objectId.toString()).length, 5)
// we already have 15 second timeout for each batch so no need to
// do another wait after compaction is done
requestUtil.list(proto.categories.BOOKMARKS, 0, 0)
requestUtil.list(proto.categories.BOOKMARKS, 0, 0, '', 0)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 2, `${t.name} check records number`)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.