Skip to content

Commit

Permalink
refactor: strong typed EventEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Feb 4, 2024
1 parent 141542e commit 808aea9
Show file tree
Hide file tree
Showing 34 changed files with 1,136 additions and 1,267 deletions.
12 changes: 5 additions & 7 deletions lib/BaseClient.ts
@@ -1,14 +1,13 @@
import { EventEmitter } from 'events';
import Logger from './Logger';
import { racePromise } from './PromiseUtils';
import { ClientStatus } from './consts/Enums';
import TypedEventEmitter from './consts/TypedEventEmitter';

interface IBaseClient {
on(event: 'status.changed', listener: (status: ClientStatus) => void): void;
emit(event: 'status.changed', status: ClientStatus): void;
}
type BaseClientEvents = { 'status.changed': ClientStatus };

abstract class BaseClient extends EventEmitter implements IBaseClient {
abstract class BaseClient<
T extends BaseClientEvents = BaseClientEvents,
> extends TypedEventEmitter<T> {
protected readonly RECONNECT_INTERVAL = 5000;
protected reconnectionTimer?: any;

Expand Down Expand Up @@ -75,4 +74,3 @@ abstract class BaseClient extends EventEmitter implements IBaseClient {
}

export default BaseClient;
export { IBaseClient };
8 changes: 4 additions & 4 deletions lib/api/Controller.ts
Expand Up @@ -45,14 +45,14 @@ class Controller {
private readonly service: Service,
private readonly countryCodes: CountryCodes,
) {
this.service.eventHandler.on('swap.update', (id, message) => {
this.logger.debug(`Swap ${id} update: ${saneStringify(message)}`);
this.pendingSwapInfos.set(id, message);
this.service.eventHandler.on('swap.update', ({ id, status }) => {
this.logger.debug(`Swap ${id} update: ${saneStringify(status)}`);
this.pendingSwapInfos.set(id, status);

const response = this.pendingSwapStreams.get(id);

if (response) {
this.writeToSse(response, message);
this.writeToSse(response, status);
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion lib/backup/BackupScheduler.ts
Expand Up @@ -153,7 +153,7 @@ class BackupScheduler {
private subscribeChannelBackups = () => {
this.eventHandler.on(
'channel.backup',
async (currency: string, channelBackup: string) => {
async ({ currency, channelBackup }) => {
const dateString = BackupScheduler.getDate(new Date());

await this.uploadString(
Expand Down
31 changes: 10 additions & 21 deletions lib/chain/ChainClient.ts
Expand Up @@ -26,25 +26,14 @@ enum AddressType {
Taproot = 'bech32m',
}

interface ChainClient {
on(event: 'block', listener: (height: number) => void): this;
emit(event: 'block', height: number): boolean;

on(
event: 'transaction',
listener: (
transaction: Transaction | LiquidTransaction,
confirmed: boolean,
) => void,
): this;
emit(
event: 'transaction',
transaction: Transaction | LiquidTransaction,
confirmed: boolean,
): boolean;
}

class ChainClient extends BaseClient {
class ChainClient extends BaseClient<{
'status.changed': ClientStatus;
block: number;
transaction: {
transaction: Transaction | LiquidTransaction;
confirmed: boolean;
};
}> {
public static readonly serviceName = 'Core';
public static readonly decimals = 100000000;

Expand Down Expand Up @@ -336,8 +325,8 @@ class ChainClient extends BaseClient {
this.emit('block', height);
});

this.zmqClient.on('transaction', (transaction, confirmed) => {
this.emit('transaction', transaction, confirmed);
this.zmqClient.on('transaction', ({ transaction, confirmed }) => {
this.emit('transaction', { transaction, confirmed });
});
};

Expand Down
36 changes: 12 additions & 24 deletions lib/chain/ZmqClient.ts
@@ -1,12 +1,12 @@
import AsyncLock from 'async-lock';
import { Transaction, crypto } from 'bitcoinjs-lib';
import { EventEmitter } from 'events';
import { Transaction as LiquidTransaction } from 'liquidjs-lib';
import zmq, { Socket } from 'zeromq';
import { parseTransaction } from '../Core';
import Logger from '../Logger';
import { formatError, getHexString, reverseBuffer } from '../Utils';
import { CurrencyType } from '../consts/Enums';
import TypedEventEmitter from '../consts/TypedEventEmitter';
import { RawTransaction } from '../consts/Types';
import ChainClient from './ChainClient';
import Errors from './Errors';
Expand All @@ -22,25 +22,13 @@ const filters = {
hashBlock: 'pubhashblock',
};

interface ZmqClient {
on(event: 'block', listener: (height: number) => void): this;
emit(event: 'block', height: number): boolean;

on(
event: 'transaction',
listener: (
transaction: Transaction | LiquidTransaction,
confirmed: boolean,
) => void,
): this;
emit(
event: 'transaction',
transaction: Transaction | LiquidTransaction,
confirmed: boolean,
): boolean;
}

class ZmqClient extends EventEmitter {
class ZmqClient extends TypedEventEmitter<{
block: number;
transaction: {
transaction: Transaction | LiquidTransaction;
confirmed: boolean;
};
}> {
// IDs of transactions that contain a UTXOs of Boltz
public utxos = new Set<string>();

Expand Down Expand Up @@ -150,7 +138,7 @@ class ZmqClient extends EventEmitter {
public rescanChain = async (startHeight: number): Promise<void> => {
const checkTransaction = (transaction: Transaction | LiquidTransaction) => {
if (this.isRelevantTransaction(transaction)) {
this.emit('transaction', transaction, true);
this.emit('transaction', { transaction, confirmed: true });
}
};

Expand Down Expand Up @@ -201,7 +189,7 @@ class ZmqClient extends EventEmitter {
// the second time the client receives the transaction
if (this.utxos.has(id)) {
this.utxos.delete(id);
this.emit('transaction', transaction, true);
this.emit('transaction', { transaction, confirmed: true });

return;
}
Expand All @@ -214,10 +202,10 @@ class ZmqClient extends EventEmitter {

// Check whether the transaction got confirmed or added to the mempool
if (transactionData.confirmations) {
this.emit('transaction', transaction, true);
this.emit('transaction', { transaction, confirmed: true });
} else {
this.utxos.add(id);
this.emit('transaction', transaction, false);
this.emit('transaction', { transaction, confirmed: false });
}
}
});
Expand Down
20 changes: 20 additions & 0 deletions lib/consts/TypedEventEmitter.ts
@@ -0,0 +1,20 @@
import { EventEmitter } from 'events';

class TypedEventEmitter<
T extends Record<string | symbol, any>,
> extends EventEmitter {
public on = <K extends keyof T>(
event: K,
listener: (arg: T[K]) => void,
): this => super.on(event as string | symbol, listener);

public once = <K extends keyof T>(
event: K,
listener: (arg: T[K]) => void,
): this => super.once(event as string | symbol, listener);

public emit = <K extends keyof T>(event: K, arg: T[K]): boolean =>
super.emit(event as string | symbol, arg);
}

export default TypedEventEmitter;
44 changes: 13 additions & 31 deletions lib/lightning/LightningClient.ts
@@ -1,4 +1,4 @@
import { IBaseClient } from '../BaseClient';
import BaseClient from '../BaseClient';
import { decodeInvoiceAmount } from '../Utils';
import { ClientStatus } from '../consts/Enums';
import * as lndrpc from '../proto/lnd/rpc_pb';
Expand Down Expand Up @@ -82,38 +82,19 @@ type Route = {
feesMsat: number;
};

interface LightningClient extends BalancerFetcher, IBaseClient {
on(event: 'peer.online', listener: (publicKey: string) => void): void;
emit(event: 'peer.online', publicKey: string): boolean;

type EventTypes = {
'status.changed': ClientStatus;
'peer.online': string;
// TODO: get rid of LND types
on(
even: 'channel.active',
listener: (channel: lndrpc.ChannelPoint.AsObject) => void,
): void;
emit(even: 'channel.active', channel: lndrpc.ChannelPoint.AsObject): boolean;

on(event: 'htlc.accepted', listener: (invoice: string) => void): void;
emit(event: 'htlc.accepted', invoice: string): boolean;

on(event: 'invoice.settled', listener: (invoice: string) => void): void;
emit(event: 'invoice.settled', string: string): boolean;

on(event: 'channel.backup', listener: (channelBackup: string) => void): void;
emit(event: 'channel.backup', channelBackup: string): boolean;

on(
event: 'subscription.error',
listener: (subscription?: string) => void,
): void;
emit(event: 'subscription.error', subscription?: string): void;

on(event: 'subscription.reconnected', listener: () => void): void;
emit(event: 'subscription.reconnected'): void;

on(event: 'status.changed', listener: (status: ClientStatus) => void): void;
emit(event: 'status.changed', status: ClientStatus): void;
'channel.active': lndrpc.ChannelPoint.AsObject;
'htlc.accepted': string;
'invoice.settled': string;
'channel.backup': string;
'subscription.error': string | undefined;
'subscription.reconnected': null;
};

interface LightningClient extends BalancerFetcher, BaseClient<EventTypes> {
symbol: string;

isConnected(): boolean;
Expand Down Expand Up @@ -185,6 +166,7 @@ export {
Invoice,
NodeInfo,
HtlcState,
EventTypes,
ChannelInfo,
InvoiceState,
InvoiceFeature,
Expand Down
7 changes: 4 additions & 3 deletions lib/lightning/LndClient.ts
Expand Up @@ -27,6 +27,7 @@ import { grpcOptions, unaryCall } from './GrpcUtils';
import {
ChannelInfo,
DecodedInvoice,
EventTypes,
HopHint,
Htlc,
HtlcState,
Expand Down Expand Up @@ -54,7 +55,7 @@ type LndConfig = {
/**
* A class representing a client to interact with LND
*/
class LndClient extends BaseClient implements LightningClient {
class LndClient extends BaseClient<EventTypes> implements LightningClient {
public static readonly serviceName = 'LND';

public static readonly paymentMinFee = 121;
Expand Down Expand Up @@ -188,7 +189,7 @@ class LndClient extends BaseClient implements LightningClient {
this.subscribeChannelBackups();

this.setClientStatus(ClientStatus.Connected);
this.emit('subscription.reconnected');
this.emit('subscription.reconnected', null);
} catch (err) {
this.setClientStatus(ClientStatus.Disconnected);

Expand Down Expand Up @@ -968,7 +969,7 @@ class LndClient extends BaseClient implements LightningClient {
);

if (this.isConnected()) {
this.emit('subscription.error');
this.emit('subscription.error', subscriptionName);
await this.reconnect();
}
};
Expand Down
7 changes: 4 additions & 3 deletions lib/lightning/cln/ClnClient.ts
Expand Up @@ -31,6 +31,7 @@ import { grpcOptions, unaryCall } from '../GrpcUtils';
import {
ChannelInfo,
DecodedInvoice,
EventTypes,
HopHint,
Htlc,
HtlcState,
Expand All @@ -50,7 +51,7 @@ import { ClnConfig, createSsl } from './Types';
import ListpaysPaysStatus = ListpaysPays.ListpaysPaysStatus;

class ClnClient
extends BaseClient
extends BaseClient<EventTypes>
implements LightningClient, RoutingHintsProvider
{
public static readonly serviceName = 'CLN';
Expand Down Expand Up @@ -213,7 +214,7 @@ class ClnClient
this.subscribeTrackHoldInvoices();

this.setClientStatus(ClientStatus.Connected);
this.emit('subscription.reconnected');
this.emit('subscription.reconnected', null);
} catch (err) {
this.setClientStatus(ClientStatus.Disconnected);

Expand Down Expand Up @@ -867,7 +868,7 @@ class ClnClient
);

if (this.isConnected()) {
this.emit('subscription.error');
this.emit('subscription.error', subscriptionName);
setTimeout(() => this.reconnect(), this.RECONNECT_INTERVAL);
}
};
Expand Down
7 changes: 4 additions & 3 deletions lib/notifications/NotificationProvider.ts
Expand Up @@ -3,6 +3,7 @@ import { satoshisToSatcomma } from '../DenominationConverter';
import Logger from '../Logger';
import {
decodeInvoice,
formatError,
getChainCurrency,
getLightningCurrency,
getSendingReceivingCurrency,
Expand Down Expand Up @@ -179,7 +180,7 @@ class NotificationProvider {

private listenToDiscord = () => {
this.client.on('error', (error) => {
this.logger.warn(`Discord client threw: ${error.message}`);
this.logger.warn(`Discord client threw: ${formatError(error)}`);
});
};

Expand Down Expand Up @@ -250,7 +251,7 @@ class NotificationProvider {

this.service.eventHandler.on(
'swap.success',
async (swap, isReverse, channelCreation) => {
async ({ swap, isReverse, channelCreation }) => {
const { onchainSymbol, lightningSymbol } = getSymbols(
swap.pair,
swap.orderSide,
Expand Down Expand Up @@ -298,7 +299,7 @@ class NotificationProvider {

this.service.eventHandler.on(
'swap.failure',
async (swap, isReverse, reason) => {
async ({ swap, isReverse, reason }) => {
const { onchainSymbol, lightningSymbol } = getSymbols(
swap.pair,
swap.orderSide,
Expand Down
7 changes: 2 additions & 5 deletions lib/notifications/clients/MattermostClient.ts
Expand Up @@ -3,12 +3,9 @@ import ws from 'ws';
import { NotificationConfig } from '../../Config';
import Logger from '../../Logger';
import { codeBlock } from '../Markup';
import NotificationClient, { INotificationClient } from './NotificationClient';
import NotificationClient from './NotificationClient';

class MattermostClient
extends NotificationClient
implements INotificationClient
{
class MattermostClient extends NotificationClient {
public static readonly serviceName = 'Mattermost';

private readonly client: Client4;
Expand Down

0 comments on commit 808aea9

Please sign in to comment.