Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction API #350

Merged
merged 5 commits into from Oct 28, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Prev

Do compaction in batch with 15 seconds timeout, also support cross batch

compaction for up to 100 records

Add LRU cache(50 records) for missing part for decryption so it can be decrypted in
future round
  • Loading branch information
darkdh committed Oct 25, 2019
commit 78e8d8946b63c04ea8a65ba242658ddc9308e27b
@@ -7,6 +7,7 @@ const proto = require('./constants/proto')
const {limitConcurrency} = require('../lib/promiseHelper')
const s3Helper = require('../lib/s3Helper')
const serializer = require('../lib/serializer')
const LRUCache = require('lru-cache')

const CONFIG = require('./config')

@@ -63,6 +64,9 @@ const RequestUtil = function (opts = {}) {
this.saveAWSCredentials(credentials)
}
this.SQSUrlByCat = []
this.missingObjectsCache = new LRUCache(50)
// This is used to keep the most recent records for each object id
this.latestRecordsCache = new LRUCache(100)
}

/**
@@ -178,9 +182,12 @@ RequestUtil.prototype.parseAWSResponse = function (bytes) {
* @param {string} category - the category ID
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340)
* @param {number=} maxRecords Limit response to a given number of recods. By default the Sync lib will fetch all matching records, which might take a long time. If falsey, fetch all records.
* @param {{
* compaction {boolean} // compact records while list object from S3
* }} opts
* @returns {Promise(Array.<Object>)}
*/
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken) {
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, opts = {}) {
const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: maxRecords || 1000,
@@ -192,8 +199,26 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
}
if (startAt) { options.StartAfter = `${prefix}/${startAt}` }
return this.withRetry(() => {
if (this.shouldListObject(startAt, category)) {
return s3Helper.listObjects(this.s3, options, !!maxRecords)
if (this.shouldListObject(startAt, category) || opts.compaction) {
const s3ObjectsPromise = s3Helper.listObjects(this.s3, options, !!maxRecords)
if (!opts.compaction) {
return s3ObjectsPromise
}
return new Promise((resolve, reject) => {
s3ObjectsPromise.then((s3Objects) => {
setTimeout(() => {
this.compactObjects(s3Objects.contents)
if (s3Objects.isTruncated) {
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, opts)
}
return new Promise((resolve, reject) => {
// compaction is done
resolve()
})
}, 15000)
})
resolve()
})
}

if (!this.SQSUrlByCat[category]) {
@@ -354,7 +379,11 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
const radix64 = require('../lib/radix64')
const output = []
const partBuffer = {}
let objectMap = {}
const objectMap = {}
// restore the partBuffer from last round
this.missingObjectsCache.forEach((value, key, cache) => {
partBuffer[key] = value
})
for (let s3Object of s3Objects) {
const parsedKey = s3Helper.parseS3Key(s3Object.Key)
const fullCrc = parsedKey.recordCrc
@@ -383,11 +412,13 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
console.log(`Record with CRC ${crc} can't be decrypted: ${e}`)
}
if (partBuffer[fullCrc]) { delete partBuffer[fullCrc] }
if (this.missingObjectsCache.has(fullCrc)) { this.missingObjectsCache.del(fullCrc) }
} else {
partBuffer[fullCrc] = data
}
}
for (let crc in partBuffer) {
this.missingObjectsCache.set(crc, partBuffer[crc])
console.log(`Record with CRC ${crc} is missing parts or corrupt.`)
}
return output
@@ -433,43 +464,28 @@ RequestUtil.prototype.put = function (category, record) {
* Compact all records in a category
* @param {string=} category - the category ID
*/
RequestUtil.prototype.compactCategory = function (category) {
const thisCategory = category
if (!recordUtil.CATEGORY_IDS.includes(thisCategory)) {
throw new Error(`Unsupported sync category: ${category}`)
}

const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: 1000,
Bucket: this.bucket,
Prefix: prefix
}
this.withRetry(() => {
return s3Helper.listObjects(this.s3, options, false)
}).then((s3Objects) => {
// This is used to keep the most recent records for each object id
let latestRecords = {}
let s3ObjectsToDelete = []
const recordObjects = this.s3ObjectsToRecords(s3Objects.contents)
recordObjects.forEach((recordObject) => {
const record = recordObject.record
if (latestRecords[record.objectId]) {
console.log('compaction deletes')
if (record.syncTimestamp > latestRecords[record.objectId].record.syncTimestamp) {
s3ObjectsToDelete = s3ObjectsToDelete.concat(latestRecords[record.objectId].objects)
console.log(latestRecords[record.objectId].record)
latestRecords[record.objectId] = recordObject
} else {
s3ObjectsToDelete = s3ObjectsToDelete.concat(recordObject.objects)
console.log(record)
}
RequestUtil.prototype.compactObjects = function (s3Objects) {
let s3ObjectsToDelete = []
const recordObjects = this.s3ObjectsToRecords(s3Objects)
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const id = JSON.stringify(record.objectId)
if (this.latestRecordsCache.has(id)) {
console.log('compaction deletes')
const cacheRecordObject = this.latestRecordsCache.get(id)
if (record.syncTimestamp > cacheRecordObject.record.syncTimestamp) {
s3ObjectsToDelete = s3ObjectsToDelete.concat(cacheRecordObject.objects)
console.log(cacheRecordObject.record)
this.latestRecordsCache.set(id, recordObject)
} else {
latestRecords[record.objectId] = recordObject
s3ObjectsToDelete = s3ObjectsToDelete.concat(recordObject.objects)
console.log(record)
}
})
s3Helper.deleteObjects(this.s3, this.bucket, s3ObjectsToDelete)
} else {
this.latestRecordsCache.set(id, recordObject)
}
})
s3Helper.deleteObjects(this.s3, this.bucket, s3ObjectsToDelete)
}

RequestUtil.prototype.s3PostFormData = function (objectKey) {
@@ -190,8 +190,9 @@ const startSync = (requester) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
}
logSync(`Compacting category: ${category}`)
requester.compactCategory(proto.categories[category])
requester.list(proto.categories[category], 0, 1000, '', {compaction: true}).then(() => {
logSync(`Compacting category: ${category}`)
})
})
ipc.on(messages.DELETE_SYNC_SITE_SETTINGS, (e) => {
logSync(`Deleting siteSettings`)

Some generated files are not rendered by default. Learn more.

@@ -67,6 +67,7 @@
"deep-equal": "^1.0.1",
"express": "^4.14.0",
"lodash.merge": "^4.6.2",
"lru-cache": "^5.1.1",
"protobufjs": "^6.8.0",
"raven": "^0.12.3",
"redis": "^2.8.0",
@@ -189,13 +189,14 @@ test('client RequestUtil', (t) => {
}

const testCanPutBookmarks = (t) => {
// This record will be split into 6 parts
const record = {
action: 'CREATE',
deviceId: new Uint8Array([0]),
objectId: testHelper.newUuid(),
bookmark: {
site: {
location: `https://brave.com?q=${'x'.repeat(4)}`,
location: `https://brave.com?q=${'x'.repeat(4096)}`,
title: 'lulz',
lastAccessedTime: 1480000000 * 1000,
creationTime: 1480000000 * 1000
@@ -211,7 +212,7 @@ test('client RequestUtil', (t) => {
objectId: testHelper.newUuid(),
bookmark: {
site: {
location: `https://brave.com?q=${'x'.repeat(4096)}`,
location: `https://brave.com?q=${'x'.repeat(4)}`,
title: 'lulz2',
lastAccessedTime: 1480000000 * 1000,
creationTime: 1480000000 * 1000
@@ -237,10 +238,14 @@ test('client RequestUtil', (t) => {

const testCanListObjects = (t) => {
t.test(`${t.name}`, (t) => {
t.plan(8)
requestUtil.list(proto.categories.BOOKMARKS)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.plan(10)
requestUtil.list(proto.categories.BOOKMARKS, 0, 3)
.then(s3Objects => {
t.equals(s3Objects.isTruncated, true)
t.deepEquals(requestUtil.s3ObjectsToRecords(s3Objects.contents), [])
return requestUtil.list(proto.categories.BOOKMARKS, 0, 3, s3Objects.nextContinuationToken)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
}).then((response) => {
const s3Record = response[0].record
// FIXME: Should this deserialize to 'CREATE' ?
t.equals(s3Record.action, 0, `${t.name}: action`)
@@ -285,7 +290,7 @@ test('client RequestUtil', (t) => {
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[0] ? response[0].record : response[0]
const s3Record = response[1] ? response[1].record : response[1]
// Here we always see error `Record with CRC xxxxx is missing parts or corrupt.`
// if record2.bookmark.site.location length is ~ 4096
if (!s3Record) {
@@ -418,6 +423,7 @@ test('client RequestUtil', (t) => {
order: '1.0.0.1'
}
}
// This record will be split into 6 parts
const record2 = {
action: 'CREATE',
deviceId: new Uint8Array([0]),
@@ -441,16 +447,21 @@ test('client RequestUtil', (t) => {

requestUtil.put(proto.categories.BOOKMARKS, record)
requestUtil.put(proto.categories.BOOKMARKS, record2)
for (let i = 0; i < 100; ++i) {
record_update.bookmark.site.title = `${record.bookmark.site.title} ${i}`
record2_update.bookmark.site.title = `${record2.bookmark.site.title} ${i}`
for (let i = 0; i < 5; ++i) {
requestUtil.put(proto.categories.BOOKMARKS, record_update)
requestUtil.put(proto.categories.BOOKMARKS, record2_update)
}
requestUtil.compactCategory(proto.categories.BOOKMARKS)
// takes about 1 minute for delete to be completely done
const consoleLogBak = console.log
// limit batch size to 10 to test cross batch compaction for around 40
// objects
requestUtil.list(proto.categories.BOOKMARKS, 0, 10, '', {compaction: true}).then(() => {
console.log = function() {}
})
// takes about 1.5 minute for delete to be completely done and we also
// have 15 second timeout for each batch
setTimeout(() => {
requestUtil.list(proto.categories.BOOKMARKS)
console.log = consoleLogBak
requestUtil.list(proto.categories.BOOKMARKS, 0, 0)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 2, `${t.name} check records number`)
@@ -465,7 +476,7 @@ test('client RequestUtil', (t) => {
.catch((error) => { t.fail(error) })
})
.catch((error) => { t.fail(error) })
}, 60000)
}, 90000)
})
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.