Skip to content

Commit

Permalink
Viewers federation protocol v2
Browse files Browse the repository at this point in the history
More efficient than the current one where instance is not fast enough to
send all viewers if a video becomes popular

The new protocol can be enabled by setting env
USE_VIEWERS_FEDERATION_V2='true'

Introduce a result field in View activity that contains the number of
viewers. This field is used by the origin instance to send the total
viewers on the video to remote instances. The difference with the
current protocol is that we don't have to send viewers individually to
remote instances.

There are 4 cases:
 * View activity from federation on Remote Video -> instance replaces
   all current viewers by a new viewer that contains the result counter
 * View activity from federation on Local Video -> instance adds the
   viewer without considering the result counter
 * Local view on Remote Video -> instance adds the viewer and send it to
   the origin instance
 * Local view on Local Video -> instance adds the viewer

Periodically PeerTube cleanups expired viewers. On local videos, the
instance sends to remote instances a View activity with the result
counter so they can update their viewers counter for that particular
video
  • Loading branch information
Chocobozzz committed Dec 20, 2023
1 parent a73f476 commit b4f4432
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 147 deletions.
5 changes: 5 additions & 0 deletions packages/models/src/activitypub/activity.ts
Expand Up @@ -116,6 +116,11 @@ export interface ActivityView extends BaseActivity {

// If sending a "viewer" event
expires?: string
result?: {
type: 'InteractionCounter'
interactionType: 'WatchAction'
userInteractionCount: number
}
}

export interface ActivityDislike extends BaseActivity {
Expand Down
1 change: 1 addition & 0 deletions packages/models/src/activitypub/objects/video-object.ts
Expand Up @@ -18,6 +18,7 @@ export interface VideoObject {
licence: ActivityIdentifierObject
language: ActivityIdentifierObject
subtitleLanguage: ActivityIdentifierObject[]

views: number

sensitive: boolean
Expand Down
4 changes: 4 additions & 0 deletions packages/node-utils/src/env.ts
Expand Up @@ -56,3 +56,7 @@ export function isProdInstance () {
export function getAppNumber () {
return process.env.NODE_APP_INSTANCE || ''
}

export function isUsingViewersFederationV2 () {
return process.env.USE_VIEWERS_FEDERATION_V2 === 'true'
}
212 changes: 125 additions & 87 deletions packages/tests/src/api/views/video-views-counter.ts
Expand Up @@ -21,133 +21,171 @@ describe('Test video views/viewers counters', function () {
}
}

before(async function () {
this.timeout(120000)
function runTests () {
describe('Test views counter on VOD', function () {
let videoUUID: string

servers = await prepareViewsServers()
})
before(async function () {
this.timeout(120000)

describe('Test views counter on VOD', function () {
let videoUUID: string
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoUUID = uuid

before(async function () {
this.timeout(120000)
await waitJobs(servers)
})

const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoUUID = uuid
it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)

await waitJobs(servers)
})
await checkCounter('views', videoUUID, 0)
})

it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)

await checkCounter('views', videoUUID, 0)
})
await checkCounter('views', videoUUID, 1)
})

it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)

await checkCounter('views', videoUUID, 1)
})
await checkCounter('views', videoUUID, 2)
})

it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)

await checkCounter('views', videoUUID, 2)
await checkCounter('views', videoUUID, 3)
})
})

it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand

await checkCounter('views', videoUUID, 3)
})
})
before(async function () {
this.timeout(240000);

describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})

before(async function () {
this.timeout(240000);
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)

({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})

it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)

await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
for (let i = 0; i < 3; i++) {
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })

it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
await wait(1000)
}

await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await waitJobs(servers)

await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)

await processViewsBuffer(servers)
await processViewsBuffer(servers)

await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})

it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(30000)
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(45000)

await wait(12000)
await waitJobs(servers)
let error = false

await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
do {
try {
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)

await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
})
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)

error = false
await wait(2500)
} catch {
error = true
}
} while (error)
})

it('Should view on a remote and on local and display appropriate views/viewers', async function () {
this.timeout(30000)

await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })

it('Should view on a remote and on local and display 2 viewers and 3 views', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })

await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await wait(3000) // Throttled federation
await waitJobs(servers)

await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 3)

await waitJobs(servers)
await processViewsBuffer(servers)

await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 2)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 4)
})

await processViewsBuffer(servers)
after(async function () {
await stopFfmpeg(command)
})
})
}

describe('Federation V1', function () {

before(async function () {
this.timeout(120000)

await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 3)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false })
})

runTests()

after(async function () {
await stopFfmpeg(command)
await cleanupTests(servers)
})
})

after(async function () {
await cleanupTests(servers)
describe('Federation V2', function () {

before(async function () {
this.timeout(120000)

servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true })
})

runTests()

after(async function () {
await cleanupTests(servers)
})
})
})
13 changes: 11 additions & 2 deletions packages/tests/src/shared/views.ts
Expand Up @@ -30,8 +30,17 @@ async function processViewsBuffer (servers: PeerTubeServer[]) {
await waitJobs(servers)
}

async function prepareViewsServers () {
const servers = await createMultipleServers(2)
async function prepareViewsServers (options: {
viewersFederationV2?: boolean
viewExpiration?: string // default 1 second
} = {}) {
const { viewExpiration = '1 second' } = options

const env = options?.viewersFederationV2 === true
? { USE_VIEWERS_FEDERATION_V2: 'true' }
: undefined

const servers = await createMultipleServers(2, { views: { videos: { ip_view_expiration: viewExpiration } } }, { env })
await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers)

Expand Down
8 changes: 7 additions & 1 deletion server/core/helpers/activity-pub-utils.ts
Expand Up @@ -196,11 +196,17 @@ const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string
uuid: 'sc:identifier'
}),

View: buildContext({
WatchAction: 'sc:WatchAction',
InteractionCounter: 'sc:InteractionCounter',
interactionType: 'sc:interactionType',
userInteractionCount: 'sc:userInteractionCount'
}),

Collection: buildContext(),
Follow: buildContext(),
Reject: buildContext(),
Accept: buildContext(),
View: buildContext(),
Announce: buildContext(),
Comment: buildContext(),
Delete: buildContext(),
Expand Down
1 change: 0 additions & 1 deletion server/core/helpers/debounce.ts
Expand Up @@ -9,7 +9,6 @@ export function Debounce (config: { timeoutMS: number }) {

timeoutRef = setTimeout(() => {
original.apply(this, args)

}, config.timeoutMS)
}
}
Expand Down
26 changes: 21 additions & 5 deletions server/core/lib/activitypub/process/process-view.ts
Expand Up @@ -28,15 +28,31 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
allowRefresh: false
})

const viewerExpires = activity.expires
? new Date(activity.expires)
: undefined

await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires })
await VideoViewsManager.Instance.processRemoteView({
video,
viewerId: activity.id,

viewerExpires: activity.expires
? new Date(activity.expires)
: undefined,
viewerResultCounter: getViewerResultCounter(activity)
})

if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender
const exceptions = [ byActor ]
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
}
}

// Viewer protocol V2
function getViewerResultCounter (activity: ActivityView) {
const result = activity.result

if (!activity.expires || result?.interactionType !== 'WatchAction' || result?.type !== 'InteractionCounter') return undefined

const counter = parseInt(result.userInteractionCount + '')
if (isNaN(counter)) return undefined

return counter
}

0 comments on commit b4f4432

Please sign in to comment.