Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction API #350

Merged
merged 5 commits into from Oct 28, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Next

Compact records based on timestamp and keep the latest one

  • Loading branch information
darkdh committed Oct 16, 2019
commit 5fe3f999eb4738bb42994c2efe94da229167b5be
@@ -354,6 +354,7 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
const radix64 = require('../lib/radix64')
const output = []
const partBuffer = {}
let objectMap = {}
for (let s3Object of s3Objects) {
const parsedKey = s3Helper.parseS3Key(s3Object.Key)
const fullCrc = parsedKey.recordCrc
@@ -362,14 +363,22 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
partBuffer[fullCrc] = partBuffer[fullCrc].concat(data)
data = partBuffer[fullCrc]
}
if (objectMap[fullCrc]) {
objectMap[fullCrc].push(s3Object.Key)
} else {
objectMap[fullCrc] = [s3Object.Key]
}
const dataBytes = s3Helper.s3StringToByteArray(data)
const dataCrc = radix64.fromNumber(crc.crc32.unsigned(dataBytes.buffer))
if (dataCrc === fullCrc) {
let decrypted = {}
try {
decrypted = this.decrypt(dataBytes)
decrypted.syncTimestamp = parsedKey.timestamp
output.push(decrypted)
output.push(
{ record: decrypted,
objects: objectMap[fullCrc]
})
} catch (e) {
console.log(`Record with CRC ${crc} can't be decrypted: ${e}`)
}
@@ -420,6 +429,42 @@ RequestUtil.prototype.put = function (category, record) {
})
}

RequestUtil.prototype.compactRecords = function (category) {
const thisCategory = category
if (!recordUtil.CATEGORY_IDS.includes(thisCategory)) {
throw new Error(`Unsupported sync category: ${category}`)
}

const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: 1000,
This conversation was marked as resolved by SergeyZhukovsky

This comment has been minimized.

Copy link
@SergeyZhukovsky

SergeyZhukovsky Oct 22, 2019

Member

Do we list 1000 objects at a time? For example we have 10K records and a client sends that command. We will check only 1000 records. There could be a situation when we have 2K unique records and 8K need to be compacted, but we pull 1000 unique always and this will never be compacted. The list command returns nextContinuationToken which should be used on a next execution to avoid getting same records.

This comment has been minimized.

Copy link
@darkdh

darkdh Oct 22, 2019

Author Member

https://github.com/brave/sync/pull/350/files#diff-93951cfb12e03132f0bd5b9a600a0a27R449
I thought specify limitResponse to false and it will call the listObjectsV2 recursively with nextContinuationToken untill we don't see isTruncated

This comment has been minimized.

Copy link
@darkdh

darkdh Oct 22, 2019

Author Member

so theoretically, we will get all records down for 1000 each batch, the listObjectsV2 API maximum key limit is also 1000

This comment has been minimized.

Copy link
@SergeyZhukovsky

SergeyZhukovsky Oct 22, 2019

Member

ah yeah, missed the part that you pass false there

Bucket: this.bucket,
Prefix: prefix
}
this.withRetry(() => {
return s3Helper.listObjects(this.s3, options, false)
}).then((s3Objects) => {
let latestRecords = {}

This comment has been minimized.

Copy link
@bridiver

bridiver Oct 21, 2019

Contributor

this is going to keep the most recent record in each batch, which is fine, but I think we should add a comment to make that clear

let s3ObjectsToDelete = []
const recordObjects = this.s3ObjectsToRecords(s3Objects.contents)
console.error(recordObjects)
recordObjects.forEach((recordObject) => {
const record = recordObject.record
This conversation was marked as resolved by AlexeyBarabash

This comment has been minimized.

Copy link
@AlexeyBarabash

AlexeyBarabash Oct 21, 2019

Contributor

I would ask to make additional logging of the records being purged at compactCategory: action, objectId and timestamp to test and diagnose it easier.

if (latestRecords[record.objectId]) {
if (record.syncTimestamp > latestRecords[record.objectId].record.syncTimestamp) {
s3ObjectsToDelete = s3ObjectsToDelete.concat(latestRecords[record.objectId].objects)
latestRecords[record.objectId] = recordObject
} else {
s3ObjectsToDelete = s3ObjectsToDelete.concat(recordObject.objects)
}
} else {
latestRecords[record.objectId] = recordObject
}
})
s3Helper.deleteObjects(this.s3, this.bucket, s3ObjectsToDelete)
})
}

RequestUtil.prototype.s3PostFormData = function (objectKey) {
let formData = new FormData() // eslint-disable-line
formData.append('key', objectKey)
@@ -58,10 +58,11 @@ const maybeSetDeviceId = (requester) => {
}
return requester.list(proto.categories.PREFERENCES)
.then(s3Objects => requester.s3ObjectsToRecords(s3Objects.contents))
.then((records) => {
.then((recordObjects) => {
let maxId = -1
if (records && records.length) {
records.forEach((record) => {
if (recordObjects && recordObjects.length) {
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const device = record.device
if (device && record.deviceId[0] > maxId) {
maxId = record.deviceId[0]
@@ -86,9 +87,10 @@ const startSync = (requester) => {
* @returns {Array.<Object>}
*/
const getJSRecords = (s3Objects, filterFunction) => {
const records = requester.s3ObjectsToRecords(s3Objects)
const recordObjects = requester.s3ObjectsToRecords(s3Objects)
let jsRecords = []
for (let record of records) {
for (let recordObject of recordObjects) {
const record = recordObject.record
const jsRecord = recordUtil.syncRecordAsJS(record)
// Useful but stored in the S3 key.
jsRecord.syncTimestamp = record.syncTimestamp
@@ -110,6 +112,9 @@ const startSync = (requester) => {
if (nextContinuationTokens[category]) {
continuationToken = nextContinuationTokens[category]
}
if (proto.categories[category] === proto.categories.BOOKMARKS) {
requester.compactRecords(proto.categories.BOOKMARKS)
}
requester.list(proto.categories[category], startAt, limitResponse, continuationToken).then((s3Objects) => {
const jsRecords = getJSRecords(s3Objects.contents)
logSync(`got ${jsRecords.length} decrypted records in ${category} after ${startAt}`)
@@ -335,3 +335,24 @@ module.exports.deletePrefix = function (s3, bucket, prefix, deleteIf) {
listAndDelete()
})
}

module.exports.deleteObjects = function (s3, bucket, objects) {
if (objects.length > 0) {
s3.deleteObjects({
Bucket: bucket,
Delete: {
Objects: objects.map(object => {
var newObj = {}
newObj['Key'] = object.toString()
return newObj
})
}
}, function (err, data) {
if (err) {
console.error(err, err.stack)
} else {
console.log(data)
}
})
}
}
@@ -79,7 +79,7 @@ test('client RequestUtil', (t) => {
requestUtil.list(proto.categories.PREFERENCES)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[0]
const s3Record = response[0].record
// FIXME: Should this deserialize to 'CREATE' ?
t.equals(s3Record.action, 0, `${t.name}: action`)
t.deepEquals(s3Record.deviceId, deviceRecord.deviceId, `${t.name}: deviceId`)
@@ -128,9 +128,9 @@ test('client RequestUtil', (t) => {
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 3, t.name)
const s3Record0 = response[0]
const s3Record1 = response[1]
const s3Record2 = response[2]
const s3Record0 = response[0].record
const s3Record1 = response[1].record
const s3Record2 = response[2].record
t.equals(s3Record0.device.name, `pyramid at ${TIME_B}`)
t.equals(s3Record1.device.name, `pyramid at ${TIME_C}`)
t.equals(s3Record2.device.name, `pyramid at ${TIME_D}`)
@@ -175,7 +175,7 @@ test('client RequestUtil', (t) => {
requestUtil.list(proto.categories.HISTORY_SITES)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[0]
const s3Record = response[0].record
// FIXME: Should this deserialize to 'CREATE' ?
t.equals(s3Record.action, 0, `${t.name}: action`)
t.deepEquals(s3Record.deviceId, record.deviceId, `${t.name}: deviceId`)
@@ -241,7 +241,7 @@ test('client RequestUtil', (t) => {
requestUtil.list(proto.categories.BOOKMARKS)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[0]
const s3Record = response[0].record
// FIXME: Should this deserialize to 'CREATE' ?
t.equals(s3Record.action, 0, `${t.name}: action`)
t.deepEquals(s3Record.deviceId, record.deviceId, `${t.name}: deviceId`)
@@ -285,7 +285,7 @@ test('client RequestUtil', (t) => {
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[0]
const s3Record = response[0] ? response[0].record : response[0]
// Here we always see error `Record with CRC xxxxx is missing parts or corrupt.`
// if record2.bookmark.site.location length is ~ 4096
if (!s3Record) {
@@ -385,7 +385,7 @@ test('client RequestUtil', (t) => {
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 1, `${t.name} deletes records`)
const s3Record = response[0]
const s3Record = response[0].record
t.assert(s3Record.device && s3Record.device.name, `${t.name} preserves device records`)
testCanLimitResponse(t)
})
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.