Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction API #350

Merged
merged 5 commits into from Oct 28, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

@@ -99,6 +99,11 @@ const messages = {
* browser sends this to delete all records in a category.
*/
DELETE_SYNC_CATEGORY: _, /* @param {string} categoryName */
/**
* browser -> webview
* browser sends this to compact records in a category
*/
COMPACT_SYNC_CATEGORY: _, /* @param {string} categoryName */
/**
* webview -> browser
* webview sends this to delete all site settings.
@@ -7,6 +7,7 @@ const proto = require('./constants/proto')
const {limitConcurrency} = require('../lib/promiseHelper')
const s3Helper = require('../lib/s3Helper')
const serializer = require('../lib/serializer')
const LRUCache = require('lru-cache')

const CONFIG = require('./config')

@@ -63,6 +64,9 @@ const RequestUtil = function (opts = {}) {
this.saveAWSCredentials(credentials)
}
this.SQSUrlByCat = []
this.missingObjectsCache = new LRUCache(50)
// This is used to keep the most recent records for each object id
this.latestRecordsCache = new LRUCache(100)
}

/**
@@ -178,9 +182,12 @@ RequestUtil.prototype.parseAWSResponse = function (bytes) {
* @param {string} category - the category ID
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340)
* @param {number=} maxRecords Limit response to a given number of recods. By default the Sync lib will fetch all matching records, which might take a long time. If falsey, fetch all records.
* @param {{
* compaction {boolean} // compact records while list object from S3
* }} opts
* @returns {Promise(Array.<Object>)}
*/
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken) {
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, opts = {}) {
const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: maxRecords || 1000,
@@ -192,8 +199,26 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
}
if (startAt) { options.StartAfter = `${prefix}/${startAt}` }
return this.withRetry(() => {
if (this.shouldListObject(startAt, category)) {
return s3Helper.listObjects(this.s3, options, !!maxRecords)
if (this.shouldListObject(startAt, category) || opts.compaction) {
const s3ObjectsPromise = s3Helper.listObjects(this.s3, options, !!maxRecords)
if (!opts.compaction) {
return s3ObjectsPromise
}
return new Promise((resolve, reject) => {
s3ObjectsPromise.then((s3Objects) => {
setTimeout(() => {
this.compactObjects(s3Objects.contents)
if (s3Objects.isTruncated) {
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, opts)
}
return new Promise((resolve, reject) => {
// compaction is done
resolve()
})
}, 15000)
})
resolve()
})
}

if (!this.SQSUrlByCat[category]) {
@@ -354,6 +379,11 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
const radix64 = require('../lib/radix64')
const output = []
const partBuffer = {}
const objectMap = {}
// restore the partBuffer from last round
this.missingObjectsCache.forEach((value, key, cache) => {
partBuffer[key] = value
})
for (let s3Object of s3Objects) {
const parsedKey = s3Helper.parseS3Key(s3Object.Key)
const fullCrc = parsedKey.recordCrc
@@ -362,23 +392,33 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
partBuffer[fullCrc] = partBuffer[fullCrc].concat(data)
data = partBuffer[fullCrc]
}
if (objectMap[fullCrc]) {
objectMap[fullCrc].push(s3Object.Key)
} else {
objectMap[fullCrc] = [s3Object.Key]
}
const dataBytes = s3Helper.s3StringToByteArray(data)
const dataCrc = radix64.fromNumber(crc.crc32.unsigned(dataBytes.buffer))
if (dataCrc === fullCrc) {
let decrypted = {}
try {
decrypted = this.decrypt(dataBytes)
decrypted.syncTimestamp = parsedKey.timestamp
output.push(decrypted)
output.push(
{ record: decrypted,
objects: objectMap[fullCrc]
})
} catch (e) {
console.log(`Record with CRC ${crc} can't be decrypted: ${e}`)
}
if (partBuffer[fullCrc]) { delete partBuffer[fullCrc] }
if (this.missingObjectsCache.has(fullCrc)) { this.missingObjectsCache.del(fullCrc) }
} else {
partBuffer[fullCrc] = data
}
}
for (let crc in partBuffer) {
this.missingObjectsCache.set(crc, partBuffer[crc])
console.log(`Record with CRC ${crc} is missing parts or corrupt.`)
}
return output
@@ -420,6 +460,34 @@ RequestUtil.prototype.put = function (category, record) {
})
}

/**
* Compact all records in a category
* @param {string=} category - the category ID
*/
RequestUtil.prototype.compactObjects = function (s3Objects) {
let s3ObjectsToDelete = []
const recordObjects = this.s3ObjectsToRecords(s3Objects)
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const id = JSON.stringify(record.objectId)
if (this.latestRecordsCache.has(id)) {
console.log('compaction deletes')
const cacheRecordObject = this.latestRecordsCache.get(id)
if (record.syncTimestamp > cacheRecordObject.record.syncTimestamp) {
s3ObjectsToDelete = s3ObjectsToDelete.concat(cacheRecordObject.objects)
console.log(cacheRecordObject.record)
this.latestRecordsCache.set(id, recordObject)
} else {
s3ObjectsToDelete = s3ObjectsToDelete.concat(recordObject.objects)
console.log(record)
}
} else {
this.latestRecordsCache.set(id, recordObject)
}
})
s3Helper.deleteObjects(this.s3, this.bucket, s3ObjectsToDelete)
}

RequestUtil.prototype.s3PostFormData = function (objectKey) {
let formData = new FormData() // eslint-disable-line
formData.append('key', objectKey)
@@ -58,10 +58,11 @@ const maybeSetDeviceId = (requester) => {
}
return requester.list(proto.categories.PREFERENCES)
.then(s3Objects => requester.s3ObjectsToRecords(s3Objects.contents))
.then((records) => {
.then((recordObjects) => {
let maxId = -1
if (records && records.length) {
records.forEach((record) => {
if (recordObjects && recordObjects.length) {
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const device = record.device
if (device && record.deviceId[0] > maxId) {
maxId = record.deviceId[0]
@@ -86,9 +87,10 @@ const startSync = (requester) => {
* @returns {Array.<Object>}
*/
const getJSRecords = (s3Objects, filterFunction) => {
const records = requester.s3ObjectsToRecords(s3Objects)
const recordObjects = requester.s3ObjectsToRecords(s3Objects)
let jsRecords = []
for (let record of records) {
for (let recordObject of recordObjects) {
const record = recordObject.record
const jsRecord = recordUtil.syncRecordAsJS(record)
// Useful but stored in the S3 key.
jsRecord.syncTimestamp = record.syncTimestamp
@@ -184,6 +186,14 @@ const startSync = (requester) => {
requester.purgeUserQueue()
})
})
ipc.on(messages.COMPACT_SYNC_CATEGORY, (e, category) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
}
requester.list(proto.categories[category], 0, 1000, '', {compaction: true}).then(() => {
logSync(`Compacting category: ${category}`)
})
})
ipc.on(messages.DELETE_SYNC_SITE_SETTINGS, (e) => {
logSync(`Deleting siteSettings`)
requester.deleteSiteSettings().then(() => {
@@ -335,3 +335,28 @@ module.exports.deletePrefix = function (s3, bucket, prefix, deleteIf) {
listAndDelete()
})
}

/**
* Delete objects in s3
* @param {AwsSdk.S3} s3
* @param {string} bucket
* @param {Array.<string>} objects
*/
module.exports.deleteObjects = function (s3, bucket, objects) {
if (objects.length > 0) {
s3.deleteObjects({
Bucket: bucket,
Delete: {
Objects: objects.map(object => {
var newObj = {}
newObj['Key'] = object.toString()
return newObj
})
}
}, function (err, data) {
if (err) {
console.error(err, err.stack)
}
})
}
}

Some generated files are not rendered by default. Learn more.

@@ -67,6 +67,7 @@
"deep-equal": "^1.0.1",
"express": "^4.14.0",
"lodash.merge": "^4.6.2",
"lru-cache": "^5.1.1",
"protobufjs": "^6.8.0",
"raven": "^0.12.3",
"redis": "^2.8.0",
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.