Skip to content

Commit

Permalink
Merge pull request #4459 from WalletConnect/feat/batch_fetch_messages
Browse files Browse the repository at this point in the history
feat: implements `irn_batchFetchMessages`
  • Loading branch information
ganchoradkov committed Apr 22, 2024
2 parents 6b68620 + 69bda77 commit 82298c9
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 27 deletions.
36 changes: 14 additions & 22 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"@walletconnect/jsonrpc-ws-connection": "1.0.14",
"@walletconnect/keyvaluestorage": "1.1.1",
"@walletconnect/logger": "2.1.2",
"@walletconnect/relay-api": "1.0.9",
"@walletconnect/relay-api": "1.0.10",
"@walletconnect/relay-auth": "1.0.4",
"@walletconnect/safe-json": "1.0.2",
"@walletconnect/time": "1.0.2",
Expand Down
17 changes: 17 additions & 0 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ export class Relayer extends IRelayer {
throw new Error("No internet connection detected. Please restart your network and try again.");
}

public async handleBatchMessageEvents(messages: RelayerTypes.MessageEvent[]) {
if (messages?.length === 0) {
this.logger.trace("Batch message events is empty. Ignoring...");
return;
}
const sortedMessages = messages.sort((a, b) => a.publishedAt - b.publishedAt);
this.logger.trace(`Batch of ${sortedMessages.length} message events sorted`);
for (const message of sortedMessages) {
try {
await this.onMessageEvent(message);
} catch (e) {
this.logger.warn(e);
}
}
this.logger.trace(`Batch of ${sortedMessages.length} message events processed`);
}

// ---------- Private ----------------------------------------------- //
/*
* In Node, we must detect when the connection is stalled and terminate it.
Expand Down
48 changes: 46 additions & 2 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export class Subscriber extends ISubscriber {
private restartInProgress = false;
private clientId: string;
private batchSubscribeTopicsLimit = 500;
private pendingBatchMessages: RelayerTypes.MessageEvent[] = [];

constructor(public relayer: IRelayer, public logger: Logger) {
super(relayer, logger);
this.relayer = relayer;
Expand Down Expand Up @@ -263,6 +265,33 @@ export class Subscriber extends ISubscriber {
}
}

private async rpcBatchFetchMessages(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay.protocol);
const request: RequestArguments<RelayJsonRpc.BatchFetchMessagesParams> = {
method: api.batchFetchMessages,
params: {
topics: subscriptions.map((s) => s.topic),
},
};
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "payload", direction: "outgoing", request });
let result;
try {
const fetchMessagesPromise = await createExpiringPromise(
this.relayer.request(request).catch((e) => this.logger.warn(e)),
this.subscribeTimeout,
);
result = (await fetchMessagesPromise) as {
messages: RelayerTypes.MessageEvent[];
};
} catch (err) {
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
}
return result;
}

private rpcUnsubscribe(topic: string, id: string, relay: RelayerTypes.ProtocolOptions) {
const api = getRelayProtocolApi(relay.protocol);
const request: RequestArguments<RelayJsonRpc.UnsubscribeParams> = {
Expand Down Expand Up @@ -361,9 +390,10 @@ export class Subscriber extends ISubscriber {

private async reset() {
if (this.cached.length) {
const batches = Math.ceil(this.cached.length / this.batchSubscribeTopicsLimit);
for (let i = 0; i < batches; i++) {
const numOfBatches = Math.ceil(this.cached.length / this.batchSubscribeTopicsLimit);
for (let i = 0; i < numOfBatches; i++) {
const batch = this.cached.splice(0, this.batchSubscribeTopicsLimit);
await this.batchFetchMessages(batch);
await this.batchSubscribe(batch);
}
}
Expand Down Expand Up @@ -399,6 +429,15 @@ export class Subscriber extends ISubscriber {
this.onBatchSubscribe(result.map((id, i) => ({ ...subscriptions[i], id })));
}

private async batchFetchMessages(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
this.logger.trace(`Fetching batch messages for ${subscriptions.length} subscriptions`);
const response = await this.rpcBatchFetchMessages(subscriptions);
if (response && response.messages) {
this.pendingBatchMessages = this.pendingBatchMessages.concat(response.messages);
}
}

private async onConnect() {
await this.restart();
this.onEnable();
Expand All @@ -416,6 +455,11 @@ export class Subscriber extends ISubscriber {
pendingSubscriptions.push(params);
});
await this.batchSubscribe(pendingSubscriptions);

if (this.pendingBatchMessages.length) {
await this.relayer.handleBatchMessageEvents(this.pendingBatchMessages);
this.pendingBatchMessages = [];
}
}

private registerEventListeners() {
Expand Down
49 changes: 49 additions & 0 deletions packages/core/test/subscriber.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,55 @@ describe("Subscriber", () => {
await disconnectSocket(core.relayer);
});

describe("init", () => {
it("should call batch fetch messages on init when it has cached topics", async () => {
const requestSpy: Sinon.SinonSpy = Sinon.spy(() => {
return {};
});
subscriber.relayer.provider.request = requestSpy;

const topic = generateRandomBytes32();
// manually switch off the subscriber
// @ts-expect-error
subscriber.onDisconnect();
// add a topic to the subscriber as if it was loaded from persistence
// @ts-expect-error
subscriber.cached = [{ topic, relay: { protocol: "irn" } }];

// restart the subscriber
// @ts-expect-error
subscriber.onConnect();

await new Promise((resolve) => setTimeout(resolve, 2000));

// first req should be the batch fetch messages call followed by the batch subscribe call
expect(requestSpy.getCalls().length).toBe(2);
expect(requestSpy.getCalls()[0].args[0].method).toBe("irn_batchFetchMessages");
expect(requestSpy.getCalls()[1].args[0].method).toBe("irn_batchSubscribe");
expect(
requestSpy.calledWith(
Sinon.match({
method: "irn_batchFetchMessages",
params: {
topics: [topic],
},
}),
),
).to.be.true;

expect(
requestSpy.calledWith(
Sinon.match({
method: "irn_batchSubscribe",
params: {
topics: [topic],
},
}),
),
).to.be.true;
});
});

describe("storageKey", () => {
it("provides the expected default `storageKey` format", () => {
const subscriber = new Subscriber(relayer, logger);
Expand Down
2 changes: 1 addition & 1 deletion packages/sign-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@
"@aws-sdk/client-cloudwatch": "3.450.0",
"@walletconnect/jsonrpc-provider": "1.0.13",
"@walletconnect/jsonrpc-ws-connection": "1.0.14",
"@walletconnect/relay-api": "1.0.9"
"@walletconnect/relay-api": "1.0.10"
}
}
1 change: 1 addition & 0 deletions packages/types/src/core/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,5 @@ export abstract class IRelayer extends IEvents {
public abstract transportOpen(relayUrl?: string): Promise<void>;
public abstract restartTransport(relayUrl?: string): Promise<void>;
public abstract confirmOnlineStateOrThrow(): Promise<void>;
public abstract handleBatchMessageEvents(messages: RelayerTypes.MessageEvent[]): Promise<void>;
}
2 changes: 1 addition & 1 deletion packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@stablelib/random": "1.0.2",
"@stablelib/sha256": "1.0.1",
"@stablelib/x25519": "1.0.3",
"@walletconnect/relay-api": "1.0.9",
"@walletconnect/relay-api": "1.0.10",
"@walletconnect/safe-json": "1.0.2",
"@walletconnect/time": "1.0.2",
"@walletconnect/types": "2.12.2",
Expand Down

0 comments on commit 82298c9

Please sign in to comment.