Skip to content

Commit

Permalink
Activity collections (likes/shares) refactor for update verification:
Browse files Browse the repository at this point in the history
1. Colletion objects embedded directly in activities in storage & transit
2. Updates to collections are upublished by updating the parent activity
3. Storage updates for activity updates, denormalized collections
  • Loading branch information
wmurphyrd committed Jan 9, 2021
1 parent 39ebf2d commit b72226d
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 158 deletions.
38 changes: 25 additions & 13 deletions net/activity.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,18 @@ module.exports = {
case 'announce':
toDo.push((async () => {
const targetActivity = object
// add to object shares collection, increment share count
if (apex.isLocalIRI(targetActivity.id) && targetActivity.shares) {
const shares = apex.objectIdFromValue(targetActivity.shares)
// add to object shares collection, increment share count
activity = await apex.store
.updateActivityMeta(activity, 'collection', targetActivity.shares[0])
// publish update to shares count
resLocal.postWork.push(async () => {
return apex.publishUpdate(recipient, await apex.getShares(targetActivity), actorId)
})
.updateActivityMeta(activity, 'collection', shares)
// publish updated object with updated shares count
const updatedTarget = await apex.updateCollection(shares)
if (updatedTarget) {
resLocal.postWork.push(async () => {
return apex.publishUpdate(recipient, updatedTarget, actorId)
})
}
}
})())
break
Expand All @@ -124,14 +128,18 @@ module.exports = {
case 'like':
toDo.push((async () => {
const targetActivity = object
// add to object likes collection, incrementing like count
if (apex.isLocalIRI(targetActivity.id) && targetActivity.likes) {
const likes = apex.objectIdFromValue(targetActivity.likes)
// add to object likes collection, incrementing like count
activity = await apex.store
.updateActivityMeta(activity, 'collection', targetActivity.likes[0])
// publish update to shares count
resLocal.postWork.push(async () => {
return apex.publishUpdate(recipient, await apex.getLikes(targetActivity), actorId)
})
.updateActivityMeta(activity, 'collection', likes)
// publish updated object with updated likes count
const updatedTarget = await apex.updateCollection(likes)
if (updatedTarget) {
resLocal.postWork.push(async () => {
return apex.publishUpdate(recipient, updatedTarget, actorId)
})
}
}
})())
break
Expand Down Expand Up @@ -163,7 +171,11 @@ module.exports = {
}
break
case 'update':
toDo.push(apex.store.updateObject(object, actorId, true))
if (apex.validateActivity(object)) {
toDo.push(apex.store.updateActivity(object, true))
} else {
toDo.push(apex.store.updateObject(object, actorId, true))
}
break
}
Promise.all(toDo).then(() => {
Expand Down
4 changes: 2 additions & 2 deletions net/security.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ function verifyActor (req, res, next) {
}

async function verifySignature (req, res, next) {
const apex = req.app.locals.apex
try {
const apex = req.app.locals.apex
// support for apps not using signature extension to ActivityPub
if (!req.get('authorization') && !req.get('signature')) {
if (req.app.get('env') !== 'development') {
Expand All @@ -43,7 +43,7 @@ async function verifySignature (req, res, next) {
// user delete message that can't be verified because we don't have the user cached
return res.status(200).send()
}
this.logger.warn('error during signature verification', err.message)
apex.logger.warn('error during signature verification', err.message)
return res.status(500).send()
}
}
25 changes: 16 additions & 9 deletions net/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ function inboxActivity (req, res, next) {
const object = resLocal.object
const actor = resLocal.actor
const recipient = resLocal.target
const tasks = []
if (!apex.validateActivity(activity)) {
resLocal.status = 400
resLocal.statusMessage = 'Invalid activity'
Expand All @@ -105,9 +106,8 @@ function inboxActivity (req, res, next) {
}
if (type === 'update') {
if (apex.validateActivity(object)) {
resLocal.status = 400
resLocal.statusMessage = 'Updates to activities not supported'
return next()
// update activity collection info
tasks.push(apex.embedCollections(object))
}
} else if (type === 'delete' && object) {
if (apex.validateActivity(object)) {
Expand All @@ -128,9 +128,12 @@ function inboxActivity (req, res, next) {
return next()
}
}
apex.addMeta(req.body, 'collection', recipient.inbox[0])
res.locals.apex.activity = true
next()
tasks.push(apex.embedCollections(activity))
Promise.all(tasks).then(() => {
apex.addMeta(req.body, 'collection', recipient.inbox[0])
res.locals.apex.activity = true
next()
}).catch(next)
}

async function jsonld (req, res, next) {
Expand Down Expand Up @@ -260,8 +263,12 @@ function outboxCreate (req, res, next) {
}
})
activity = apex.buildActivity('Create', actorIRI, object.to, extras)
} else if (activity.type.toLowerCase() === 'create' && activity.object) {
activity.object[0].id = apex.utils.objectIdToIRI()
} else {
if (activity.type.toLowerCase() === 'create' && activity.object) {
activity.object[0].id = apex.utils.objectIdToIRI()
}
// run through builder to format & ensure published, shares, likes included
activity = apex.buildActivity(activity.type, actorIRI, activity.to, activity)
}
Promise.resolve(activity).then(actResolved => {
req.body = actResolved
Expand Down Expand Up @@ -298,7 +305,7 @@ function outboxActivityObject (req, res, next) {
next()
})
}
async function outboxActivity (req, res, next) {
function outboxActivity (req, res, next) {
if (!res.locals.apex.target) {
return next()
}
Expand Down
82 changes: 56 additions & 26 deletions pub/activity.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
'use strict'

const merge = require('deepmerge')
module.exports = {
acceptFollow,
address,
addToOutbox,
buildActivity,
buildTombstone,
embedCollections,
publishUndoUpdate,
publishUpdate,
resolveActivity
}

function buildActivity (type, actorId, to, etc = {}) {
async function buildActivity (type, actorId, to, etc = {}) {
const activityId = this.store.generateId()
const collections = this.utils.idToActivityCollections(activityId)
const act = merge.all([
{
id: this.utils.activityIdToIRI(activityId),
type,
actor: actorId,
to,
published: new Date().toISOString()
},
collections,
etc
])
return this.fromJSONLD(act).then(activity => {
activity._meta = {}
return activity
})
let activity = this.mergeJSONLD({
id: this.utils.activityIdToIRI(activityId),
type,
actor: actorId,
to,
published: new Date().toISOString()
}, etc)
activity = await this.fromJSONLD(activity)
for (const key in collections) {
activity[key] = [await this.buildCollection(collections[key], true, 0)]
}
activity._meta = {}
return activity
}

async function buildTombstone (object) {
Expand Down Expand Up @@ -90,10 +88,10 @@ async function address (activity, sender, audienceOverride) {
return result.value
}
if (result.value.items) {
return result.value.items.map(this.resolveObject)
return result.value.items.map(id => this.resolveObject(id))
}
if (result.value.orderedItems) {
return result.value.orderedItems.map(this.resolveObject)
return result.value.orderedItems.map(id => this.resolveObject(id))
}
return undefined
})
Expand Down Expand Up @@ -137,21 +135,53 @@ async function acceptFollow (actor, targetActivity) {
return { postTask, updated }
}

async function embedCollections (activity) {
if (this.isLocalIRI(activity.id)) {
if (this.isString(activity.shares?.[0])) {
activity.shares = [await this.getCollection(activity.shares)]
}
if (this.isString(activity.likes?.[0])) {
activity.likes = [await this.getCollection(activity.likes)]
}
} else {
if (this.isString(activity.shares?.[0])) {
activity.shares = [await this.resolveObject(activity.shares, false, true)]
}
// if not paged, don't duplicate items in embedded copies
delete activity.shares?.[0]?.orderedItems
if (this.isString(activity.likes?.[0])) {
activity.likes = [await this.resolveObject(activity.likes, false, true)]
}
delete activity.likes?.[0]?.orderedItems
}
return activity
}

// undo may need to publish updates on behalf of multiple
// actors to completely clear the activity
async function publishUndoUpdate (colId, actor, audience) {
const info = this.utils.iriToCollectionInfo(colId)
let actorId
let updated
let updatedActorId
if (!['followers', 'following', 'liked', 'likes', 'shares'].includes(info?.name)) {
return
}
if (info.activity) {
const activityIRI = this.utils.activityIdToIRI(info.activity)
actorId = (await this.store.getActivity(activityIRI))?.actor[0]
updated = await this.updateCollection(colId)
updatedActorId = updated.actor[0]
} else {
updated = await this.getCollection(colId)
updatedActorId = this.utils.usernameToIRI(info.actor)
}
if (actor.id === (actorId ?? this.utils.usernameToIRI(info.actor))) {
const colUpdate = await this.getCollection(colId)
return this.publishUpdate(actor, colUpdate, audience)
if (actor.id === updatedActorId) {
return this.publishUpdate(actor, updated, audience)
} else {
return this.publishUpdate(
await this.store.getObject(updatedActorId, true),
updated,
audience
)
}
return undefined
}

async function publishUpdate (actor, object, cc) {
Expand Down
65 changes: 49 additions & 16 deletions pub/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const overlaps = require('overlaps')

module.exports = {
buildCollection,
buildCollectionPage,
getCollection,
getInbox,
getOutbox,
Expand All @@ -14,7 +16,26 @@ module.exports = {
getAdded,
getBlocked,
getRejected,
getRejections
getRejections,
updateCollection,
}

function buildCollection (id, isOrdered, totalItems) {
return this.fromJSONLD({
id,
type: isOrdered ? 'OrderedCollection' : 'Collection',
totalItems,
first: this.addPageToIRI(id, true)
})
}

async function buildCollectionPage (collectionId, page, isOrdered, lastItemId) {
return this.fromJSONLD({
id: this.addPageToIRI(collectionId, page),
partOf: collectionId,
type: isOrdered ? 'OrderedCollectionPage' : 'CollectionPage',
next: lastItemId ? this.addPageToIRI(collectionId, lastItemId) : null
})
}

/* page: MongoDB _id of item to begin querying after (i.e. last item of last page) or
Expand All @@ -23,14 +44,11 @@ module.exports = {
* Infinity - get all items (internal use only)
*/
async function getCollection (collectionId, page, remapper, blockList) {
collectionId = this.objectIdFromValue(collectionId)
if (!page) {
// if page isn't specified, just collection description is served
return this.fromJSONLD({
id: collectionId,
type: 'OrderedCollection',
totalItems: await this.store.getStreamCount(collectionId),
first: this.addPageToIRI(collectionId, true)
})
const totalItems = await this.store.getStreamCount(collectionId)
return this.buildCollection(collectionId, true, totalItems)
}
let after = page
let limit = this.itemsPerPage
Expand All @@ -41,23 +59,23 @@ async function getCollection (collectionId, page, remapper, blockList) {
after = null
limit = null
}
const pageObj = {
id: this.addPageToIRI(collectionId, page),
partOf: collectionId,
type: 'OrderedCollectionPage'
}
let stream = await this.store.getStream(collectionId, limit, after)
if (stream.length) {
pageObj.next = this.addPageToIRI(collectionId, stream[stream.length - 1]._id)
}
const pageObj = await this.buildCollectionPage(
collectionId,
page,
true,
// determine next page prior to filtering so
// you can pass large blocks of filtered activities
stream[stream.length - 1]?._id
)
if (blockList) {
stream = stream.filter(act => !overlaps(blockList, act.actor))
}
if (remapper) {
stream = stream.map(remapper)
}
pageObj.orderedItems = stream
return this.fromJSONLD(pageObj)
return pageObj
}

function getInbox (actor, page) {
Expand Down Expand Up @@ -108,6 +126,21 @@ function getRejections (actor, page) {
return this.getCollection(rejectionsIRI, page, idRemapper)
}

async function updateCollection (collectionId) {
collectionId = this.objectIdFromValue(collectionId)
const info = this.utils.iriToCollectionInfo(collectionId)
// shares/likes have to be embedded in their activity
// for verifiable updates because the actor id is not in
// the collection object
if (info.activity) {
// updated embedded copies in activity
return this.store.updateActivity({
id: this.utils.activityIdToIRI(info.activity),
[info.name]: [await this.getCollection(collectionId)]
}, false)
}
}

// non-exported utils
function idRemapper (object) {
return object.id
Expand Down

0 comments on commit b72226d

Please sign in to comment.