Skip to content

Commit

Permalink
feat(startup): Add ticket system
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco Crespi committed Mar 24, 2020
1 parent bda4f86 commit cf01bc6
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 34 deletions.
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ const main = async () => {

await client.init();

console.log('-------------------------------------');
console.log('Waiting for start ticket...');
console.log('-------------------------------------');

await client.waitForStartupTicket();

console.log('-------------------------------------');
console.log('Connecting to discord...');
console.log('-------------------------------------');
Expand Down
10 changes: 10 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ export class IMClient extends Client {
await this.rabbitmq.init();
}

public async waitForStartupTicket() {
const start = process.uptime();
const interval = setInterval(
() => console.log(`Waiting for ticket since ${Math.floor(process.uptime() - start)} seconds...`),
10000
);
await this.rabbitmq.waitForStartupTicket();
clearInterval(interval);
}

private async onClientReady(): Promise<void> {
if (this.hasStarted) {
console.error('BOT HAS ALREADY STARTED, IGNORING EXTRA READY EVENT');
Expand Down
2 changes: 1 addition & 1 deletion src/framework/services/Commands.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { captureException, withScope } from '@sentry/node';
import { GuildChannel, Member, Message, PrivateChannel } from 'eris';
import { promises, readdir, readdirSync, statSync } from 'fs';
import { promises, statSync } from 'fs';
import i18n from 'i18n';
import { basename, resolve } from 'path';

Expand Down
132 changes: 104 additions & 28 deletions src/framework/services/RabbitMq.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { captureException } from '@sentry/node';
import { Channel, connect, Connection, Message as MQMessage } from 'amqplib';
import { Message, TextChannel } from 'eris';
import moment from 'moment';

import { ClientCacheObject, IMClient } from '../../client';
import { ShardCommand } from '../../types';
import { BotType, ShardCommand } from '../../types';
import { FakeChannel } from '../../util';

interface ShardMessage {
Expand All @@ -18,6 +19,10 @@ export class RabbitMqService {
private conn: Connection;
private connRetry: number = 0;

private qNameStartup: string;
private channelStartup: Channel;
private startTicket: MQMessage;

private qName: string;
private channel: Channel;
private channelRetry: number = 0;
Expand All @@ -33,31 +38,39 @@ export class RabbitMqService {
return;
}

await this.initConnection();
await this.initChannel();
}
private async initConnection() {
try {
const conn = await connect(this.client.config.rabbitmq);
this.conn = conn;
this.conn.on('close', async (err) => {
console.error(err);
this.conn = null;

setTimeout(() => this.init(), this.connRetry * 30);
this.connRetry++;
});
this.conn.on('error', async (err) => {
console.error(err);
this.conn = null;
if (err) {
console.error(err);
}
await this.shutdownConnection();

setTimeout(() => this.init(), this.connRetry * 30);
setTimeout(() => this.initConnection(), this.connRetry * 30);
this.connRetry++;
});

await this.initChannel();
} catch (err) {
console.error(err);
this.conn = null;

setTimeout(() => this.init(), this.connRetry * 30);
this.connRetry++;
await this.shutdownConnection();
await this.initConnection();
}
}
private async shutdownConnection() {
await this.shutdownChannel();

if (this.conn) {
try {
await this.conn.close();
} catch {
// NO-OP
}
this.conn = null;
}
}

Expand All @@ -70,18 +83,12 @@ export class RabbitMqService {
this.qName = `shard-${this.client.instance}-${this.client.shardId}`;

try {
// Regular queue
this.channel = await this.conn.createChannel();
this.channel.on('error', async (err) => {
console.error(err);
this.channel = null;

setTimeout(() => this.initChannel(), this.channelRetry * 30);
this.channelRetry++;
});
this.channel.on('close', async (err) => {
console.error(err);
this.channel = null;
if (err) {
console.error(err);
}
await this.shutdownChannel();

setTimeout(() => this.initChannel(), this.channelRetry * 30);
this.channelRetry++;
Expand All @@ -100,11 +107,80 @@ export class RabbitMqService {
this.channelRetry = 0;
} catch (err) {
console.error(err);

await this.shutdownChannel();
await this.initChannel();
}
}
private async shutdownChannel() {
if (this.channel) {
try {
await this.channel.close();
} catch {
// NO-OP
}

this.channel = null;
}
}

public async waitForStartupTicket() {
if (!this.conn) {
console.log('No connection available, this is ok for single installations or in dev mode.');
console.log('Skipping start ticket...');
return;
}

// Don't do this for custom bots
if (this.client.type === BotType.custom) {
return;
}

setTimeout(() => this.initChannel(), this.channelRetry * 30);
this.channelRetry++;
this.qNameStartup = `shard-${this.client.instance}-start`;

this.channelStartup = await this.conn.createChannel();
this.channelStartup.on('close', async (err) => {
if (err) {
captureException(err);
console.error(err);
}

console.error('Could not aquire startup ticket');
process.exit(1);
});

await this.channelStartup.prefetch(1);
await this.channelStartup.assertQueue(this.qNameStartup, { durable: true, autoDelete: false });

// Return a promise that resolves when we aquire a start ticket (a rabbitmq message)
return new Promise((resolve) => {
// Start listening on the queue for one message (our start ticket)
this.channelStartup.consume(
this.qNameStartup,
(msg) => {
console.log('Aquired start ticket...');

// Save the ticket so we can return it to the queue when our startup is done
this.startTicket = msg;
resolve();
},
{ noAck: false }
);
});
}
public async endStartup() {
if (!this.channelStartup) {
return;
}

// Nack the message, so that it gets returned to the queue for the next process to use
this.channelStartup.nack(this.startTicket, false, true);

// Close the channel because we don't want another ticket
await this.channelStartup.close();
this.channelStartup = null;

this.startTicket = null;
}

public async sendToManager(message: { id: string; [x: string]: any }, isResend: boolean = false) {
Expand Down

0 comments on commit cf01bc6

Please sign in to comment.