Skip to content

Commit

Permalink
⚡ run subscriptions job in batches (#766)
Browse files Browse the repository at this point in the history
* 🔊 log process error

* ✨ measure api performance, add statistics endpoint

* log subscriptions job progress

* 🚚 rename plugins to not match module name

* ⚡️ run subscriptions job in batches
  • Loading branch information
moisout committed Jul 23, 2021
1 parent 193c1ee commit 1d88911
Show file tree
Hide file tree
Showing 24 changed files with 66 additions and 38 deletions.
2 changes: 1 addition & 1 deletion client/components/About.vue
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ import BadgeButton from '@/components/buttons/BadgeButton.vue';
import InvidiousLicense from '@/components/licenses/Invidious.vue';
import { computed, defineComponent, onMounted, ref } from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'About',
Expand Down
2 changes: 1 addition & 1 deletion client/components/Comment.vue
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import BadgeButton from '@/components/buttons/BadgeButton.vue';
import Invidious from '@/plugins/services/invidious';
import { defineComponent, ref, useRoute } from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useImgProxy } from '@/plugins/proxy';
export default defineComponent({
Expand Down
2 changes: 1 addition & 1 deletion client/components/SearchAutoComplete.vue
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<script lang="ts">
import { defineComponent, ref, watch } from '@nuxtjs/composition-api';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useAccessor } from '@/store';
export default defineComponent({
Expand Down
2 changes: 1 addition & 1 deletion client/components/buttons/SubscribeButton.vue
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<script lang="ts">
import { defineComponent, onMounted, ref } from '@nuxtjs/composition-api';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useAccessor } from '@/store/index';
export default defineComponent({
Expand Down
2 changes: 1 addition & 1 deletion client/components/history/HistoryList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import { defineComponent } from '@nuxtjs/composition-api';
import humanizeDuration from 'humanize-duration';
import DeleteIcon from 'vue-material-design-icons/Delete.vue';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useAccessor } from '@/store';
import BadgeButton from '@/components/buttons/BadgeButton.vue';
Expand Down
2 changes: 1 addition & 1 deletion client/components/licenses/Invidious.vue
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<script lang="ts">
import { defineComponent, ref, useFetch } from '@nuxtjs/composition-api';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'InvidiousLicense',
Expand Down
2 changes: 1 addition & 1 deletion client/components/popup/SubscriptionImport.vue
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ import SubscriptionConverter from '@/plugins/services/subscriptionConverter';
import Spinner from '@/components/Spinner.vue';
import '@/assets/styles/popup.scss';
import { computed, defineComponent, ref } from '@nuxtjs/composition-api';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useAccessor } from '@/store/index';
class ChannelDto {
Expand Down
6 changes: 3 additions & 3 deletions client/components/videoplayer/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import { commons } from '@/plugins/commons';
import { SponsorBlock } from '@/plugins/services/sponsorBlock';
import { SponsorBlockSegmentsDto } from '@/plugins/shared';
import { useAccessor } from '@/store';
import { useFormatting } from '@/plugins/formatting';
import { useAxios } from '@/plugins/axiosPlugin';
import { useImgProxy } from '@/plugins/proxy';
import { MediaMetadataHelper } from './mediaMetadata';
import { calculateSeekPercentage, matchSeekProgressPercentage, seekbarFunctions } from './seekbar';
import { parseChapters } from './chapters';
import { destroyInstance, initializeHlsStream, isHlsNative, isHlsSupported } from './hlsHelper';
import { useFormatting } from '~/plugins/formatting';
import { useAxios } from '~/plugins/axios';
import { useImgProxy } from '~/plugins/proxy';

export const videoPlayerSetup = (props: any, emit: Function) => {
const accessor = useAccessor();
Expand Down
2 changes: 1 addition & 1 deletion client/components/watch/VideoLoadingTemplate.vue
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<script lang="ts">
import { computed, defineComponent } from '@nuxtjs/composition-api';
import Spinner from '@/components/Spinner.vue';
import { useImgProxy } from '~/plugins/proxy';
import { useImgProxy } from '@/plugins/proxy';
export default defineComponent({
name: 'VideoLoadingTemplate',
Expand Down
2 changes: 1 addition & 1 deletion client/pages/history.vue
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import SectionTitle from '@/components/SectionTitle.vue';
import RestartOffIcon from 'vue-material-design-icons/RestartOff.vue';
import Pagination from '@/components/pagination/Pagination.vue';
import HistoryList from '@/components/history/HistoryList.vue';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useAccessor } from '@/store';
import BadgeButton from '@/components/buttons/BadgeButton.vue';
import SmallSearchBox from '@/components/SmallSearchBox.vue';
Expand Down
2 changes: 1 addition & 1 deletion client/pages/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import ViewTubeApi from '@/plugins/services/viewTubeApi';
import BadgeButton from '@/components/buttons/BadgeButton.vue';
import { defineComponent, ref, useFetch, useMeta } from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'Home',
Expand Down
4 changes: 2 additions & 2 deletions client/pages/playlist.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ import EyeIcon from 'vue-material-design-icons/EyeOutline.vue';
import EyeClosedIcon from 'vue-material-design-icons/EyeOffOutline.vue';
import CountIcon from 'vue-material-design-icons/Counter.vue';
import CalendarIcon from 'vue-material-design-icons/CalendarClock.vue';
import { useAxios } from '~/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useImgProxy } from '@/plugins/proxy';
import { useAccessor } from '~/store';
import { useImgProxy } from '~/plugins/proxy';
export default defineComponent({
name: 'Playlist',
Expand Down
2 changes: 1 addition & 1 deletion client/pages/profile/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ import {
} from '@nuxtjs/composition-api';
import HistoryList from '@/components/history/HistoryList.vue';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'Profile',
Expand Down
2 changes: 1 addition & 1 deletion client/pages/results.vue
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import {
watch
} from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'Search',
Expand Down
2 changes: 1 addition & 1 deletion client/pages/subscriptions/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import {
watch
} from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
export default defineComponent({
name: 'Subscriptions',
Expand Down
2 changes: 1 addition & 1 deletion client/pages/subscriptions/manage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import {
watch
} from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import SmallSearchBox from '@/components/SmallSearchBox.vue';
export default defineComponent({
Expand Down
2 changes: 1 addition & 1 deletion client/pages/watch.vue
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ import {
watch
} from '@nuxtjs/composition-api';
import { useAccessor } from '@/store';
import { useAxios } from '@/plugins/axios';
import { useAxios } from '@/plugins/axiosPlugin';
import { useImgProxy } from '@/plugins/proxy';
import VideoLoadingTemplate from '@/components/watch/VideoLoadingTemplate.vue';
import { Result } from 'ytpl';
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion client/store/instances.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getterTree, mutationTree, actionTree } from 'typed-vuex';
import { commons } from '~/plugins/commons';
import { commons } from '@/plugins/commons';

export const state = () => ({
currentInstance: '' as string,
Expand Down
2 changes: 1 addition & 1 deletion nuxt.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ const config: NuxtConfig = {
'@/plugins/directives/index',
'@/plugins/formatting',
'@/plugins/shared',
{ src: '~/plugins/vue-datepicker', mode: 'client' },
{ src: '@/plugins/vueDatepickerPlugin', mode: 'client' },
{ src: '@/plugins/localStorage', mode: 'client' }
],

Expand Down
31 changes: 25 additions & 6 deletions server/user/subscriptions/subscriptions-job.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import humanizeDuration from 'humanize-duration';
// import { Common } from 'server/core/common';
import Consola from 'consola';
import { Common } from 'server/core/common';
import { Job } from 'bull';

export const runSubscriptionsJob = async (
uniqueChannelIds: Array<string>
uniqueChannelIds: Array<string>,
job: Job = null

): Promise<{
channelResultArray: Array<ChannelBasicInfoDto>;
videoResultArray: Array<VideoBasicInfoDto>;
Expand All @@ -30,16 +33,32 @@ export const runSubscriptionsJob = async (
});
};

const channelIdBatches = [];
uniqueChannelIds = [].concat(...uniqueChannelIds);

while (uniqueChannelIds.length) {
channelIdBatches.push(uniqueChannelIds.splice(0, 100));
}

console.log(channelIdBatches.length);

let i = 0;

await uniqueChannelIds
.reduce(async (previousPromise: Promise<void>, nextString: string) => {
await channelIdBatches
.reduce(async (previousPromise: Promise<void>, nextBatch: Array<string>) => {
await previousPromise;
console.log(`${i} of ${uniqueChannelIds.length}`);
const jobProgress = Math.floor((i / channelIdBatches.length) * 100);
await job.progress(jobProgress);
i++;
return getFeedPromise(nextString);
return Promise.allSettled(
nextBatch.map(val => {
return getFeedPromise(val);
})
);
}, Promise.resolve())
.catch(() => {});
.catch(error => {
console.log('job error', error);
});

if (videoRawResultArray.length > 0) {
videoResultArray = videoRawResultArray.reduce(
Expand Down
17 changes: 14 additions & 3 deletions server/user/subscriptions/subscriptions.processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { OnQueueActive, OnQueueCompleted, OnQueueError, Process, Processor } from '@nestjs/bull';
import {
OnQueueActive,
OnQueueCompleted,
OnQueueError,
OnQueueProgress,
Process,
Processor
} from '@nestjs/bull';
import { InjectModel } from '@nestjs/mongoose';
import { Job } from 'bull';
import { Model } from 'mongoose';
Expand Down Expand Up @@ -28,9 +35,9 @@ export class SubscriptionsProcessor {
const uniqueChannelIds = [...new Set(channelIds)];
let subscriptionResults = null;
try {
subscriptionResults = await runSubscriptionsJob(uniqueChannelIds);
subscriptionResults = await runSubscriptionsJob(uniqueChannelIds, job);
} catch (error) {
console.log(error);
throw new Error(error);
}

if (subscriptionResults) {
Expand Down Expand Up @@ -62,6 +69,10 @@ export class SubscriptionsProcessor {
return null;
}

@OnQueueProgress()
onProgress(job: Job, progress: number) {
console.log(`${job.name}: ${progress}%`);
}
@OnQueueError()
onError(error: Error) {
console.log(error);
Expand Down
12 changes: 5 additions & 7 deletions server/user/subscriptions/subscriptions.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ export class SubscriptionsService {
private notificationsService: NotificationsService
) {}

@Cron(CronExpression.EVERY_30_MINUTES)
@Cron(CronExpression.EVERY_HOUR)
// @Cron(new Date(Date.now() + 60 * 1000))
async collectSubscriptionsJob(): Promise<void> {
const userSubscriptions = await this.subscriptionModel.find().lean(true).exec();

this.subscriptionsQueue.add(
{
userSubscriptions
},
{}
);
this.subscriptionsQueue.add({
userSubscriptions
});
}

async saveChannelBasicInfo(channel: ChannelBasicInfoDto): Promise<ChannelBasicInfoDto | null> {
Expand Down

0 comments on commit 1d88911

Please sign in to comment.