Skip to content

Commit

Permalink
Improve chart performance (misskey-dev#7360)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* wip

* wip

* Update chart.ts

* wip

* Improve server performance

* wip

* wip
  • Loading branch information
syuilo authored and Austin Lanari committed May 2, 2021
1 parent 1ec69bc commit d0c61a1
Show file tree
Hide file tree
Showing 32 changed files with 891 additions and 163 deletions.
218 changes: 218 additions & 0 deletions migration/1615965918224-chart-v2.ts

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions migration/1615966519402-chart-v2-2.ts
@@ -0,0 +1,22 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class chartV221615966519402 implements MigrationInterface {
name = 'chartV221615966519402'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "___foo" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "___foo"`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___remote_users"`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___local_users"`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___remote_users"`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___local_users"`);
}

}
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -47,7 +47,7 @@
"@koa/router": "9.0.1",
"@sentry/browser": "5.29.2",
"@sentry/tracing": "5.29.2",
"@sinonjs/fake-timers": "6.0.1",
"@sinonjs/fake-timers": "7.0.2",
"@syuilo/aiscript": "0.11.1",
"@types/bcryptjs": "2.4.2",
"@types/bull": "3.15.0",
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/queue-stats.ts
@@ -1,5 +1,5 @@
import Xev from 'xev';
import { deliverQueue, inboxQueue } from '../queue';
import { deliverQueue, inboxQueue } from '../queue/queues';

const ev = new Xev();

Expand Down
4 changes: 4 additions & 0 deletions src/db/postgre.ts
@@ -1,3 +1,7 @@
// https://github.com/typeorm/typeorm/issues/2400
const types = require('pg').types;
types.setTypeParser(20, Number);

import { createConnection, Logger, getConnection } from 'typeorm';
import config from '../config';
import { entities as charts } from '../services/chart/entities';
Expand Down
1 change: 1 addition & 0 deletions src/global.d.ts
@@ -0,0 +1 @@
type FIXME = any;
88 changes: 88 additions & 0 deletions src/misc/before-shutdown.ts
@@ -0,0 +1,88 @@
// https://gist.github.com/nfantone/1eaa803772025df69d07f4dbf5df7e58

'use strict';

/**
* @callback BeforeShutdownListener
* @param {string} [signalOrEvent] The exit signal or event name received on the process.
*/

/**
* System signals the app will listen to initiate shutdown.
* @const {string[]}
*/
const SHUTDOWN_SIGNALS = ['SIGINT', 'SIGTERM'];

/**
* Time in milliseconds to wait before forcing shutdown.
* @const {number}
*/
const SHUTDOWN_TIMEOUT = 15000;

/**
* A queue of listener callbacks to execute before shutting
* down the process.
* @type {BeforeShutdownListener[]}
*/
const shutdownListeners = [];

/**
* Listen for signals and execute given `fn` function once.
* @param {string[]} signals System signals to listen to.
* @param {function(string)} fn Function to execute on shutdown.
*/
const processOnce = (signals, fn) => {
return signals.forEach(sig => process.once(sig, fn));
};

/**
* Sets a forced shutdown mechanism that will exit the process after `timeout` milliseconds.
* @param {number} timeout Time to wait before forcing shutdown (milliseconds)
*/
const forceExitAfter = timeout => () => {
setTimeout(() => {
// Force shutdown after timeout
console.warn(`Could not close resources gracefully after ${timeout}ms: forcing shutdown`);
return process.exit(1);
}, timeout).unref();
};

/**
* Main process shutdown handler. Will invoke every previously registered async shutdown listener
* in the queue and exit with a code of `0`. Any `Promise` rejections from any listener will
* be logged out as a warning, but won't prevent other callbacks from executing.
* @param {string} signalOrEvent The exit signal or event name received on the process.
*/
async function shutdownHandler(signalOrEvent) {
console.warn(`Shutting down: received [${signalOrEvent}] signal`);

for (const listener of shutdownListeners) {
try {
await listener(signalOrEvent);
} catch (err) {
console.warn(`A shutdown handler failed before completing with: ${err.message || err}`);
}
}

return process.exit(0);
}

/**
* Registers a new shutdown listener to be invoked before exiting
* the main process. Listener handlers are guaranteed to be called in the order
* they were registered.
* @param {BeforeShutdownListener} listener The shutdown listener to register.
* @returns {BeforeShutdownListener} Echoes back the supplied `listener`.
*/
export function beforeShutdown(listener) {
shutdownListeners.push(listener);
return listener;
}

// Register shutdown callback that kills the process after `SHUTDOWN_TIMEOUT` milliseconds
// This prevents custom shutdown handlers from hanging the process indefinitely
processOnce(SHUTDOWN_SIGNALS, forceExitAfter(SHUTDOWN_TIMEOUT));

// Register process shutdown callback
// Will listen to incoming signal events and execute all registered handlers in the stack
processOnce(SHUTDOWN_SIGNALS, shutdownHandler);
23 changes: 1 addition & 22 deletions src/queue/index.ts
@@ -1,4 +1,3 @@
import * as Queue from 'bull';
import * as httpSignature from 'http-signature';

import config from '../config';
Expand All @@ -13,22 +12,7 @@ import { queueLogger } from './logger';
import { DriveFile } from '../models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { IActivity } from '../remote/activitypub/type';

function initializeQueue(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}
import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';

export type InboxJobData = {
activity: IActivity,
Expand All @@ -44,11 +28,6 @@ function renderError(e: Error): any {
};
}

export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');

const deliverLogger = queueLogger.createSubLogger('deliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
Expand Down
18 changes: 18 additions & 0 deletions src/queue/initialize.ts
@@ -0,0 +1,18 @@
import * as Queue from 'bull';
import config from '../config';

export function initialize(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}
7 changes: 7 additions & 0 deletions src/queue/queues.ts
@@ -0,0 +1,7 @@
import config from '../config';
import { initialize as initializeQueue } from './initialize';

export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');
18 changes: 15 additions & 3 deletions src/services/chart/charts/classes/active-users.ts
Expand Up @@ -17,6 +17,18 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
return {};
}

@autobind
protected aggregate(logs: ActiveUsersLog[]): ActiveUsersLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as ActiveUsersLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as ActiveUsersLog['remote']['users']),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<ActiveUsersLog>> {
return {};
Expand All @@ -25,11 +37,11 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
@autobind
public async update(user: User) {
const update: Obj = {
count: 1
users: [user.id]
};

await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id);
});
}
}
22 changes: 22 additions & 0 deletions src/services/chart/charts/classes/drive.ts
Expand Up @@ -27,6 +27,28 @@ export default class DriveChart extends Chart<DriveLog> {
};
}

@autobind
protected aggregate(logs: DriveLog[]): DriveLog {
return {
local: {
totalCount: logs[0].local.totalCount,
totalSize: logs[0].local.totalSize,
incCount: logs.reduce((a, b) => a + b.local.incCount, 0),
incSize: logs.reduce((a, b) => a + b.local.incSize, 0),
decCount: logs.reduce((a, b) => a + b.local.decCount, 0),
decSize: logs.reduce((a, b) => a + b.local.decSize, 0),
},
remote: {
totalCount: logs[0].remote.totalCount,
totalSize: logs[0].remote.totalSize,
incCount: logs.reduce((a, b) => a + b.remote.incCount, 0),
incSize: logs.reduce((a, b) => a + b.remote.incSize, 0),
decCount: logs.reduce((a, b) => a + b.remote.decCount, 0),
decSize: logs.reduce((a, b) => a + b.remote.decSize, 0),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<DriveLog>> {
const [localCount, remoteCount, localSize, remoteSize] = await Promise.all([
Expand Down
11 changes: 11 additions & 0 deletions src/services/chart/charts/classes/federation.ts
Expand Up @@ -20,6 +20,17 @@ export default class FederationChart extends Chart<FederationLog> {
};
}

@autobind
protected aggregate(logs: FederationLog[]): FederationLog {
return {
instance: {
total: logs[0].instance.total,
inc: logs.reduce((a, b) => a + b.instance.inc, 0),
dec: logs.reduce((a, b) => a + b.instance.dec, 0),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<FederationLog>> {
const [total] = await Promise.all([
Expand Down
18 changes: 15 additions & 3 deletions src/services/chart/charts/classes/hashtag.ts
Expand Up @@ -17,6 +17,18 @@ export default class HashtagChart extends Chart<HashtagLog> {
return {};
}

@autobind
protected aggregate(logs: HashtagLog[]): HashtagLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as HashtagLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as HashtagLog['remote']['users']),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<HashtagLog>> {
return {};
Expand All @@ -25,11 +37,11 @@ export default class HashtagChart extends Chart<HashtagLog> {
@autobind
public async update(hashtag: string, user: User) {
const update: Obj = {
count: 1
users: [user.id]
};

await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id, hashtag);
}, hashtag);
}
}
44 changes: 44 additions & 0 deletions src/services/chart/charts/classes/instance.ts
Expand Up @@ -36,6 +36,50 @@ export default class InstanceChart extends Chart<InstanceLog> {
};
}

@autobind
protected aggregate(logs: InstanceLog[]): InstanceLog {
return {
requests: {
failed: logs.reduce((a, b) => a + b.requests.failed, 0),
succeeded: logs.reduce((a, b) => a + b.requests.succeeded, 0),
received: logs.reduce((a, b) => a + b.requests.received, 0),
},
notes: {
total: logs[0].notes.total,
inc: logs.reduce((a, b) => a + b.notes.inc, 0),
dec: logs.reduce((a, b) => a + b.notes.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.notes.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.notes.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.notes.diffs.normal, 0),
},
},
users: {
total: logs[0].users.total,
inc: logs.reduce((a, b) => a + b.users.inc, 0),
dec: logs.reduce((a, b) => a + b.users.dec, 0),
},
following: {
total: logs[0].following.total,
inc: logs.reduce((a, b) => a + b.following.inc, 0),
dec: logs.reduce((a, b) => a + b.following.dec, 0),
},
followers: {
total: logs[0].followers.total,
inc: logs.reduce((a, b) => a + b.followers.inc, 0),
dec: logs.reduce((a, b) => a + b.followers.dec, 0),
},
drive: {
totalFiles: logs[0].drive.totalFiles,
totalUsage: logs[0].drive.totalUsage,
incFiles: logs.reduce((a, b) => a + b.drive.incFiles, 0),
incUsage: logs.reduce((a, b) => a + b.drive.incUsage, 0),
decFiles: logs.reduce((a, b) => a + b.drive.decFiles, 0),
decUsage: logs.reduce((a, b) => a + b.drive.decUsage, 0),
},
};
}

@autobind
protected async fetchActual(group: string): Promise<DeepPartial<InstanceLog>> {
const [
Expand Down
11 changes: 11 additions & 0 deletions src/services/chart/charts/classes/network.ts
Expand Up @@ -15,6 +15,17 @@ export default class NetworkChart extends Chart<NetworkLog> {
return {};
}

@autobind
protected aggregate(logs: NetworkLog[]): NetworkLog {
return {
incomingRequests: logs.reduce((a, b) => a + b.incomingRequests, 0),
outgoingRequests: logs.reduce((a, b) => a + b.outgoingRequests, 0),
totalTime: logs.reduce((a, b) => a + b.totalTime, 0),
incomingBytes: logs.reduce((a, b) => a + b.incomingBytes, 0),
outgoingBytes: logs.reduce((a, b) => a + b.outgoingBytes, 0),
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<NetworkLog>> {
return {};
Expand Down

0 comments on commit d0c61a1

Please sign in to comment.