diff --git a/CHANGELOG.md b/CHANGELOG.md index a97112c5..3fa7d938 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### 1.4.0 + +- Adds support for whitelisting a channel such that a whitelisted channel will be exempted from requirements check when signing up for the YPP program. Adds `POST /channels/whitelist` endpoint to whitelist a channel/s & `DELETE /channels/whitelist/{ytChannelHandle}` endpoint to remove a channel from whitelist. + ### 1.3.0 - Integrates ElasticSearch alerting feature based on the filtration criteria set on the ingested logs. diff --git a/package.json b/package.json index f36bb16b..722279a8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "youtube-sync", - "version": "1.3.0", + "version": "1.4.0", "license": "MIT", "scripts": { "postpack": "rm -f oclif.manifest.json", diff --git a/src/infrastructure/index.ts b/src/infrastructure/index.ts index aeb08360..8454212c 100644 --- a/src/infrastructure/index.ts +++ b/src/infrastructure/index.ts @@ -1,5 +1,5 @@ import * as aws from '@pulumi/aws' -import { resourcePrefix, Stats, YtChannel, YtUser, YtVideo } from '../types/youtube' +import { resourcePrefix, Stats, WhitelistChannel, YtChannel, YtUser, YtVideo } from '../types/youtube' const nameof = (name: keyof T) => name @@ -107,7 +107,20 @@ const statsTable = new aws.dynamodb.Table('stats', { billingMode: 'PAY_PER_REQUEST', }) +const whitelistChannelsTable = new aws.dynamodb.Table('whitelistChannels', { + name: `${resourcePrefix}whitelistChannels`, + hashKey: nameof('channelHandle'), + attributes: [ + { + name: nameof('channelHandle'), + type: 'S', + }, + ], + billingMode: 'PAY_PER_REQUEST', +}) + export const usersTableArn = userTable.arn export const channelsTableArn = channelsTable.arn export const videosTableArn = videosTable.arn export const statsTableArn = statsTable.arn +export const whitelistChannelsTableArn = whitelistChannelsTable.arn diff --git a/src/repository/DynamodbService.ts b/src/repository/DynamodbService.ts index aeeed02d..8118cc07 100644 --- a/src/repository/DynamodbService.ts +++ b/src/repository/DynamodbService.ts @@ -5,12 +5,14 @@ import { ChannelsRepository, ChannelsService } from './channel' import { StatsRepository } from './stats' import { UsersRepository, UsersService } from './user' import { VideosRepository, VideosService } from './video' +import { WhitelistChannelsRepository } from './whitelistChannels' interface IDynamodbClient { channels: ChannelsRepository users: UsersRepository videos: VideosRepository stats: StatsRepository + whitelistChannels: WhitelistChannelsRepository } const DynamodbClient = { @@ -20,6 +22,7 @@ const DynamodbClient = { users: new UsersRepository(tablePrefix), videos: new VideosRepository(tablePrefix), stats: new StatsRepository(tablePrefix), + whitelistChannels: new WhitelistChannelsRepository(tablePrefix), } }, } diff --git a/src/repository/whitelistChannels.ts b/src/repository/whitelistChannels.ts new file mode 100644 index 00000000..1dc7df1a --- /dev/null +++ b/src/repository/whitelistChannels.ts @@ -0,0 +1,103 @@ +import AsyncLock from 'async-lock' +import * as dynamoose from 'dynamoose' +import { ConditionInitializer } from 'dynamoose/dist/Condition' +import { AnyItem } from 'dynamoose/dist/Item' +import { Query, QueryResponse, Scan, ScanResponse } from 'dynamoose/dist/ItemRetriever' +import { DYNAMO_MODEL_OPTIONS, IRepository, mapTo } from '.' +import { ResourcePrefix, WhitelistChannel } from '../types/youtube' + +function whitelistChannelsModel(tablePrefix: ResourcePrefix) { + const schema = new dynamoose.Schema( + { + channelHandle: { + type: String, + hashKey: true, + }, + }, + { + saveUnknown: false, + timestamps: { + createdAt: { + createdAt: { + type: { + value: Date, + settings: { + storage: 'iso', + }, + }, + }, + }, + }, + } + ) + return dynamoose.model(`${tablePrefix}whitelistChannels`, schema, DYNAMO_MODEL_OPTIONS) +} + +export class WhitelistChannelsRepository implements IRepository { + private model + + // lock any updates on whitelistChannels table + private readonly ASYNC_LOCK_ID = 'whitelistChannels' + private asyncLock: AsyncLock = new AsyncLock() + + constructor(tablePrefix: ResourcePrefix) { + this.model = whitelistChannelsModel(tablePrefix) + } + + async upsertAll(): Promise { + throw new Error('Not implemented') + } + + async scan(init: ConditionInitializer, f: (q: Scan) => Scan): Promise { + return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => { + let lastKey = undefined + const results = [] + do { + let scannedBatch: ScanResponse = await f(this.model.scan(init)) + .startAt(lastKey as any) + .exec() + let batchResult = scannedBatch.map((b) => mapTo(b)) + results.push(...batchResult) + lastKey = scannedBatch.lastKey + } while (lastKey) + return results + }) + } + + async get(channelId: string): Promise { + return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => { + const result = await this.model.get(channelId) + return result ? mapTo(result) : undefined + }) + } + + async save(model: WhitelistChannel): Promise { + return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => { + const result = await this.model.update(model) + return mapTo(result) + }) + } + + async delete(channelId: string): Promise { + return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => { + await this.model.delete(channelId) + return + }) + } + + async query(init: ConditionInitializer, f: (q: Query) => Query) { + return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => { + let lastKey = undefined + const results = [] + do { + let queriedBatch: QueryResponse = await f(this.model.query(init)) + .startAt(lastKey as any) + .exec() + let batchResult = queriedBatch.map((b) => mapTo(b)) + results.push(...batchResult) + lastKey = queriedBatch.lastKey + } while (lastKey) + return results + }) + } +} diff --git a/src/services/httpApi/api-spec.json b/src/services/httpApi/api-spec.json index 06336b97..a840eee1 100644 --- a/src/services/httpApi/api-spec.json +++ b/src/services/httpApi/api-spec.json @@ -357,6 +357,94 @@ ] } }, + "/channels/whitelist": { + "post": { + "operationId": "ChannelsController_addWhitelistChannels", + "summary": "", + "description": "Whitelist a given youtube channel/s by it's channel handle", + "parameters": [ + { + "name": "authorization", + "required": true, + "in": "header", + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + }, + "responses": { + "default": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/WhitelistChannelDto" + } + } + } + } + } + }, + "tags": [ + "channels" + ] + } + }, + "/channels/whitelist/{channelHandle}": { + "delete": { + "operationId": "ChannelsController_deleteWhitelistedChannel", + "summary": "", + "description": "Remove a whitelisted channel by it's channel handle", + "parameters": [ + { + "name": "authorization", + "required": true, + "in": "header", + "schema": { + "type": "string" + } + }, + { + "name": "channelHandle", + "required": true, + "in": "path", + "schema": { + "type": "string" + } + } + ], + "responses": { + "default": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WhitelistChannelDto" + } + } + } + } + }, + "tags": [ + "channels" + ] + } + }, "/channels/induction/requirements": { "get": { "operationId": "ChannelsController_inductionRequirements", @@ -882,6 +970,17 @@ "joystreamVideo" ] }, + "WhitelistChannelDto": { + "type": "object", + "properties": { + "channelHandle": { + "type": "string" + } + }, + "required": [ + "channelHandle" + ] + }, "ChannelInductionRequirementsDto": { "type": "object", "properties": { diff --git a/src/services/httpApi/controllers/channels.ts b/src/services/httpApi/controllers/channels.ts index 7db27ceb..0eb3484e 100644 --- a/src/services/httpApi/controllers/channels.ts +++ b/src/services/httpApi/controllers/channels.ts @@ -2,6 +2,7 @@ import { BadRequestException, Body, Controller, + Delete, Get, Headers, Inject, @@ -31,6 +32,7 @@ import { UserDto, VerifyChannelDto, VideoDto, + WhitelistChannelDto, } from '../dtos' @Controller('channels') @@ -310,6 +312,56 @@ export class ChannelsController { return result } + @Post('/whitelist') + @ApiResponse({ type: WhitelistChannelDto, isArray: true }) + @ApiOperation({ description: `Whitelist a given youtube channel/s by it's channel handle` }) + async addWhitelistChannels( + @Headers('authorization') authorizationHeader: string, + @Body(new ParseArrayPipe({ items: WhitelistChannelDto, whitelist: true })) channels: WhitelistChannelDto[] + ) { + const yppOwnerKey = authorizationHeader ? authorizationHeader.split(' ')[1] : '' + // TODO: fix this YT_SYNCH__HTTP_API__OWNER_KEY config value + if (yppOwnerKey !== process.env.YT_SYNCH__HTTP_API__OWNER_KEY) { + throw new UnauthorizedException('Invalid YPP owner key') + } + + try { + for (const { channelHandle } of channels) { + await this.dynamodbService.repo.whitelistChannels.save({ channelHandle, createdAt: new Date() }) + } + } catch (error) { + const message = error instanceof Error ? error.message : error + throw new NotFoundException(message) + } + } + + @Delete('/whitelist/:channelHandle') + @ApiResponse({ type: WhitelistChannelDto }) + @ApiOperation({ description: `Remove a whitelisted channel by it's channel handle` }) + async deleteWhitelistedChannel( + @Headers('authorization') authorizationHeader: string, + @Param('channelHandle') channelHandle: string + ) { + const yppOwnerKey = authorizationHeader ? authorizationHeader.split(' ')[1] : '' + // TODO: fix this YT_SYNCH__HTTP_API__OWNER_KEY config value + if (yppOwnerKey !== process.env.YT_SYNCH__HTTP_API__OWNER_KEY) { + throw new UnauthorizedException('Invalid YPP owner key') + } + + try { + const whitelistChannel = await this.dynamodbService.repo.whitelistChannels.get(channelHandle) + + if (!whitelistChannel) { + throw new NotFoundException(`Channel with handle ${channelHandle} is not whitelisted`) + } + + await this.dynamodbService.repo.whitelistChannels.delete(channelHandle) + } catch (error) { + const message = error instanceof Error ? error.message : error + throw new NotFoundException(message) + } + } + @Get('/induction/requirements') @ApiResponse({ type: ChannelInductionRequirementsDto }) @ApiOperation({ description: 'Retrieves Youtube Partner program induction requirements' }) diff --git a/src/services/httpApi/controllers/users.ts b/src/services/httpApi/controllers/users.ts index 025884bc..e1626863 100644 --- a/src/services/httpApi/controllers/users.ts +++ b/src/services/httpApi/controllers/users.ts @@ -51,8 +51,16 @@ export class UsersController { ) } - // get verified channel from user - await this.youtube.getVerifiedChannel(user) + // get channel from user + const channel = await this.youtube.getChannel(user) + + // check if the channel is whitelisted + const whitelistedChannel = await this.dynamodbService.repo.whitelistChannels.get(channel.customUrl) + + if (!whitelistedChannel) { + // verify given channel + await this.youtube.verifyChannel(channel) + } // save user await this.dynamodbService.users.save(user) diff --git a/src/services/httpApi/dtos.ts b/src/services/httpApi/dtos.ts index e9af8af3..a6222326 100644 --- a/src/services/httpApi/dtos.ts +++ b/src/services/httpApi/dtos.ts @@ -8,6 +8,7 @@ import { IsOptional, IsString, IsUrl, + Matches, ValidateIf, ValidateNested, } from 'class-validator' @@ -199,3 +200,9 @@ export class VerifyChannelDto { @IsNumber() @ApiProperty({ required: true }) joystreamChannelId: number @IsBoolean() @ApiProperty({ required: true }) isVerified: boolean } + +export class WhitelistChannelDto { + @Matches(/^@/, { message: 'The channel handle should start with a "@"' }) + @ApiProperty({ required: true }) + channelHandle: string +} diff --git a/src/services/storage-node/api.ts b/src/services/storage-node/api.ts index 4c970669..f5df3178 100644 --- a/src/services/storage-node/api.ts +++ b/src/services/storage-node/api.ts @@ -1,5 +1,5 @@ import { createType } from '@joystream/types' -import axios, { AxiosError } from 'axios' +import axios from 'axios' import BN from 'bn.js' import FormData from 'form-data' import fs from 'fs' @@ -69,11 +69,15 @@ export class StorageNodeApi { }, }) } catch (error) { - const msg: string = (error as AxiosError).response?.data?.message - this.logger.error(msg) - if (msg.includes(`Data object ${dataObjectId} has already been accepted by storage node`)) { - // No need to throw an error, we can continue with the next asset - continue + if (axios.isAxiosError(error) && error.response) { + const storageNodeUrl = error.config?.url + const { status, data } = error.response + this.logger.error(`${storageNodeUrl} - errorCode: ${status}, msg: ${data?.message}`) + + if (data?.message?.includes(`Data object ${dataObjectId} has already been accepted by storage node`)) { + // No need to throw an error, we can continue with the next asset + continue + } } throw error diff --git a/src/services/syncProcessing/ContentUploadService.ts b/src/services/syncProcessing/ContentUploadService.ts index a664f3e5..409715f8 100644 --- a/src/services/syncProcessing/ContentUploadService.ts +++ b/src/services/syncProcessing/ContentUploadService.ts @@ -117,7 +117,8 @@ export class ContentUploadService { videoId: video.resourceId, channelId: video.joystreamChannelId, }) - } catch (err) { + } catch (error) { + const err = (error as Error).message this.logger.error(`Got error uploading assets for video`, { videoId: video.resourceId, err }) // Update video state and save to DB await this.dynamodbService.videos.updateState(video, 'UploadFailed') diff --git a/src/services/youtube/api.ts b/src/services/youtube/api.ts index 48b6a67f..bc9c57c7 100644 --- a/src/services/youtube/api.ts +++ b/src/services/youtube/api.ts @@ -18,7 +18,7 @@ import Schema$Channel = youtube_v3.Schema$Channel export interface IYoutubeApi { getUserFromCode(code: string, youtubeRedirectUri: string): Promise getChannel(user: Pick): Promise - getVerifiedChannel(user: YtUser): Promise + verifyChannel(channel: YtChannel): Promise getVideos(channel: YtChannel, top: number): Promise getAllVideos(channel: YtChannel, max: number): Promise downloadVideo(videoUrl: string, outPath: string): ReturnType @@ -129,9 +129,7 @@ class YoutubeClient implements IYoutubeApi { return channel } - async getVerifiedChannel(user: YtUser): Promise { - const channel = await this.getChannel(user) - + async verifyChannel(channel: YtChannel): Promise { const { minimumSubscribersCount, minimumVideoCount, minimumChannelAgeHours, minimumVideoAgeHours } = this.config.creatorOnboardingRequirements const errors: YoutubeApiError[] = [] @@ -271,6 +269,7 @@ class YoutubeClient implements IYoutubeApi { description: channel.snippet?.description, title: channel.snippet?.title, userId: user.id, + customUrl: channel.snippet?.customUrl, userAccessToken: user.accessToken, userRefreshToken: user.refreshToken, thumbnails: { @@ -354,7 +353,7 @@ class QuotaTrackingClient implements IYoutubeApi { return this.decorated.getUserFromCode(code, youtubeRedirectUri) } - async getVerifiedChannel(user: YtUser) { + async verifyChannel(channel: YtChannel) { // ensure have some left api quota if (!(await this.canCallYoutube('signup'))) { throw new YoutubeApiError( @@ -364,7 +363,7 @@ class QuotaTrackingClient implements IYoutubeApi { } try { - const verifiedChannel = await this.decorated.getVerifiedChannel(user) + const verifiedChannel = await this.decorated.verifyChannel(channel) // increase used quota count await this.increaseUsedQuota({ signupQuotaIncrement: 1 }) return verifiedChannel diff --git a/src/types/youtube.ts b/src/types/youtube.ts index adcf91c2..2b3208c9 100644 --- a/src/types/youtube.ts +++ b/src/types/youtube.ts @@ -11,6 +11,9 @@ export class YtChannel { // ID of the user that owns the channel userId: string + // Youtube channel custom URL. Also known as youtube channel handle + customUrl: string + // user provided email email: string @@ -222,6 +225,11 @@ export class Stats { partition = 'stats' } +export class WhitelistChannel { + channelHandle: string + createdAt: Date +} + export const getImages = (channel: YtChannel) => { return [ ...urlAsArray(channel.thumbnails.default),