Skip to content

Commit

Permalink
feat(network): increase network performance by reducing unnecessary r…
Browse files Browse the repository at this point in the history
…pc calls (latticexyz#165)

* feat: faster network [wip]

* feat: no network check provider

* feat: not wait for stuff you don't care about

* chore: wip

* chore: clean up

* chore: self-review

* test: add chainId to provider config

* feat(network): add gasPriceInput$ to txQueue

* fix(network): make sure initial gas price is an integer value

Co-authored-by: ludens <ludens@lattice.xyz>
  • Loading branch information
alvrs and ludns committed Sep 29, 2022
1 parent 4eacd08 commit 5bf9eb8
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 56 deletions.
1 change: 1 addition & 0 deletions packages/network/src/createBlockNumberStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Providers } from "./createProvider";

/**
* Creates a stream of block numbers based on the `block` event of the currently connected provider.
* In case `initialSync` is provided, this stream will also output a stream of past block numbers to drive replaying events.
*
* @param providers Mobx computed providers object (created by {@link createReconnectingProvider}).
* @param options
Expand Down
1 change: 1 addition & 0 deletions packages/network/src/createNetwork.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const config = {
pollingInterval: 1000,
skipNetworkCheck: true,
},
chainId: 4242,
},
chainId: 100,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/network/src/createNetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function createNetwork(initialConfig: NetworkConfig) {
// Sync the local time to the chain time in regular intervals
const syncBlockSub = combineLatest([blockNumber$, computedToStream(providers)])
.pipe(
throttleTime(config.clock.syncInterval, undefined, { leading: true, trailing: true }), // Update time max once per 5s
throttleTime(config.clock.syncInterval, undefined, { leading: true, trailing: true }),
concatMap(([blockNumber, currentProviders]) =>
currentProviders ? fetchBlock(currentProviders.json, blockNumber) : EMPTY
), // Fetch the latest block if a provider is available
Expand Down
15 changes: 11 additions & 4 deletions packages/network/src/createProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { JsonRpcBatchProvider, JsonRpcProvider, WebSocketProvider } from "@ethersproject/providers";
import { Networkish, WebSocketProvider } from "@ethersproject/providers";
import { callWithRetry, observableToComputed, timeoutAfter } from "@latticexyz/utils";
import { IComputedValue, IObservableValue, observable, reaction, runInAction } from "mobx";
import { ensureNetworkIsUp } from "./networkUtils";
import { MUDJsonRpcBatchProvider, MUDJsonRpcProvider } from "./provider";
import { ProviderConfig } from "./types";

export type Providers = ReturnType<typeof createProvider>;
Expand All @@ -15,10 +16,16 @@ export type Providers = ReturnType<typeof createProvider>;
* ws: WebSocketProvider
* }
*/
export function createProvider({ jsonRpcUrl, wsRpcUrl, options }: ProviderConfig) {
export function createProvider({ chainId, jsonRpcUrl, wsRpcUrl, options }: ProviderConfig) {
const network: Networkish = {
chainId,
name: "mudChain",
};
const providers = {
json: options?.batch ? new JsonRpcBatchProvider(jsonRpcUrl) : new JsonRpcProvider(jsonRpcUrl),
ws: wsRpcUrl ? new WebSocketProvider(wsRpcUrl) : undefined,
json: options?.batch
? new MUDJsonRpcBatchProvider(jsonRpcUrl, network)
: new MUDJsonRpcProvider(jsonRpcUrl, network),
ws: wsRpcUrl ? new WebSocketProvider(wsRpcUrl, network) : undefined,
};

if (options?.pollingInterval) {
Expand Down
6 changes: 4 additions & 2 deletions packages/network/src/createSystemExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Contract, ContractInterface, Signer } from "ethers";
import { observable, runInAction } from "mobx";
import { createTxQueue } from "./createTxQueue";
import { Network } from "./createNetwork";
import { BehaviorSubject } from "rxjs";

/**
* Create a system executor object.
Expand All @@ -23,7 +24,8 @@ export function createSystemExecutor<T extends { [key: string]: Contract }>(
network: Network,
systems: Component<{ value: Type.String }>,
interfaces: { [key in keyof T]: ContractInterface },
options?: { devMode?: boolean }
gasPrice$: BehaviorSubject<number>,
options?: { devMode?: boolean; concurrency?: number }
) {
const systemContracts = observable.box({} as T);
const systemIdPreimages: { [key: string]: string } = Object.keys(interfaces).reduce((acc, curr) => {
Expand All @@ -47,7 +49,7 @@ export function createSystemExecutor<T extends { [key: string]: Contract }>(
runInAction(() => systemContracts.set({ ...systemContracts.get(), [system.id]: system.contract }));
});

const { txQueue, dispose } = createTxQueue<T>(systemContracts, network, options);
const { txQueue, dispose } = createTxQueue<T>(systemContracts, network, gasPrice$, options);
world.registerDisposer(dispose);

return txQueue;
Expand Down
71 changes: 53 additions & 18 deletions packages/network/src/createTxQueue.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { BaseContract, BigNumberish, CallOverrides, Overrides } from "ethers";
import { action, autorun, computed, IComputedValue, IObservableValue, observable, runInAction } from "mobx";
import { mapObject, deferred, uuid, awaitValue, cacheUntilReady, extractEncodedArguments } from "@latticexyz/utils";
import { autorun, computed, IComputedValue, IObservableValue, observable, runInAction } from "mobx";
import { mapObject, deferred, uuid, awaitValue, cacheUntilReady } from "@latticexyz/utils";
import { Mutex } from "async-mutex";
import { TransactionResponse } from "@ethersproject/providers";
import { JsonRpcProvider, TransactionReceipt } from "@ethersproject/providers";
import { Contracts, TxQueue } from "./types";
import { ConnectionState } from "./createProvider";
import { Network } from "./createNetwork";
import { getRevertReason } from "./networkUtils";
import { BehaviorSubject } from "rxjs";

type ReturnTypeStrict<T> = T extends (...args: any) => any ? ReturnType<T> : never;

Expand All @@ -23,11 +24,16 @@ type ReturnTypeStrict<T> = T extends (...args: any) => any ? ReturnType<T> : nev
export function createTxQueue<C extends Contracts>(
computedContracts: IComputedValue<C> | IObservableValue<C>,
network: Network,
gasPrice$: BehaviorSubject<number>,
options?: { concurrency?: number; devMode?: boolean }
): { txQueue: TxQueue<C>; dispose: () => void; ready: IComputedValue<boolean | undefined> } {
const { concurrency } = options || {};

const queue = createPriorityQueue<{
execute: (nonce: number, gasLimit: BigNumberish) => Promise<TransactionResponse>;
execute: (
nonce: number,
gasLimit: BigNumberish
) => Promise<{ hash: string; wait: () => Promise<TransactionReceipt> }>;
estimateGas: () => BigNumberish | Promise<BigNumberish>;
cancel: () => void;
stateMutability?: string;
Expand Down Expand Up @@ -71,8 +77,16 @@ export function createTxQueue<C extends Contracts>(
target: C[keyof C],
prop: keyof C[keyof C],
args: unknown[]
): Promise<ReturnTypeStrict<typeof target[typeof prop]>> {
const [resolve, reject, promise] = deferred<ReturnTypeStrict<typeof target[typeof prop]>>();
): Promise<{
hash: string;
wait: () => Promise<TransactionReceipt>;
response: Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
}> {
const [resolve, reject, promise] = deferred<{
hash: string;
wait: () => Promise<TransactionReceipt>;
response: Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
}>();

// Extract existing overrides from function call
const hasOverrides = args.length > 0 && isOverrides(args[args.length - 1]);
Expand All @@ -90,7 +104,7 @@ export function createTxQueue<C extends Contracts>(
// Create a function that executes the tx when called
const execute = async (nonce: number, gasLimit: BigNumberish) => {
try {
const member = target[prop];
const member = target.populateTransaction[prop as string];
if (member == undefined) {
throw new Error("Member does not exist.");
}
Expand All @@ -103,12 +117,34 @@ export function createTxQueue<C extends Contracts>(
);
}

const configOverrides = { ...overrides, nonce, gasLimit };
// Populate config
const configOverrides = {
gasPrice: gasPrice$.getValue(),
...overrides,
nonce,
gasLimit,
};
if (options?.devMode) configOverrides.gasPrice = 0;

const result = await member(...argsWithoutOverrides, configOverrides);
resolve(result);
return result;
// Populate tx
const populatedTx = await member(...argsWithoutOverrides, configOverrides);
populatedTx.nonce = nonce;
populatedTx.chainId = network.config.chainId;

// Execute tx
const signedTx = await target.signer.signTransaction(populatedTx);
const hash = await (target.provider as JsonRpcProvider).perform("sendTransaction", {
signedTransaction: signedTx,
});
const response = target.provider.getTransaction(hash) as Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
// This promise is awaited asynchronously in the tx queue and the action queue to catch errors
const wait = async () => (await response).wait();

// Resolved value goes to the initiator of the transaction
resolve({ hash, wait, response });

// Returned value gets processed inside the tx queue
return { hash, wait };
} catch (e) {
reject(e as Error);
throw e; // Rethrow error to catch when processing the queue
Expand Down Expand Up @@ -137,6 +173,11 @@ export function createTxQueue<C extends Contracts>(
// Increase utilization to prevent executing more tx than allowed by capacity
utilization++;

// Start processing another request from the queue
// Note: we start processing again after increasing the utilization to process up to `concurrency` tx request in parallel.
// At the end of this function after decreasing the utilization we call processQueue again trigger tx requests waiting for capacity.
processQueue();

// Run exclusive to avoid two tx requests awaiting the nonce in parallel and submitting with the same nonce.
const txResult = await submissionMutex.runExclusive(async () => {
// Define variables in scope visible to finally block
Expand Down Expand Up @@ -173,7 +214,7 @@ export function createTxQueue<C extends Contracts>(
(("code" in error && error.code === "NONCE_EXPIRED") ||
JSON.stringify(error).includes("transaction already imported"));

console.log("TxQueue:", {
console.info("TxQueue:", {
error,
isNonViewTransaction,
shouldIncreaseNonce,
Expand Down Expand Up @@ -203,16 +244,10 @@ export function createTxQueue<C extends Contracts>(
const worldAddress = params.get("worldAddress");
// Log useful commands that can be used to replay this tx
const trace = `mud trace --config deploy.json --world ${worldAddress} --tx ${txResult.hash}`;
const call = `mud call-system --world ${worldAddress} --caller ${network.connectedAddress.get()} --calldata ${extractEncodedArguments(
txResult.data
)} --systemId <SYSTEM_ID>`;

console.log("---------- DEBUG COMMANDS (RUN IN TERMINAL) -------------");
console.log("Trace:");
console.log(trace);
console.log("//");
console.log("Call system:");
console.log(call);
console.log("---------------------------------------------------------");
}
}
Expand Down
28 changes: 28 additions & 0 deletions packages/network/src/provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { JsonRpcBatchProvider, JsonRpcProvider, Network, Networkish } from "@ethersproject/providers";
import { ConnectionInfo } from "ethers/lib/utils";

export class MUDJsonRpcProvider extends JsonRpcProvider {
constructor(url: string | ConnectionInfo | undefined, network: Networkish) {
super(url, network);
}
async detectNetwork(): Promise<Network> {
const network = this.network;
if (network == null) {
throw new Error("No network");
}
return network;
}
}

export class MUDJsonRpcBatchProvider extends JsonRpcBatchProvider {
constructor(url?: string | ConnectionInfo | undefined, network?: Networkish | undefined) {
super(url, network);
}
async detectNetwork(): Promise<Network> {
const network = this.network;
if (network == null) {
throw new Error("No network");
}
return network;
}
}
1 change: 1 addition & 0 deletions packages/network/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type Clock = {
};

export interface ProviderConfig {
chainId: number;
jsonRpcUrl: string;
wsRpcUrl?: string;
options?: { batch?: boolean; pollingInterval?: number; skipNetworkCheck?: boolean };
Expand Down
49 changes: 41 additions & 8 deletions packages/network/src/workers/SyncWorker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { JsonRpcProvider } from "@ethersproject/providers";
import { keccak256, sleep } from "@latticexyz/utils";
import { computed } from "mobx";
Expand Down Expand Up @@ -143,7 +144,11 @@ describe("Sync.worker", () => {
checkpointServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
chainId: 4242,
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -171,7 +176,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
chainId: 4242,
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -201,7 +210,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});
await sleep(0);
Expand All @@ -215,7 +228,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "http://localhost:50052",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});
await sleep(0);
Expand All @@ -229,7 +246,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand All @@ -253,7 +274,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand All @@ -277,7 +302,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -311,7 +340,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand Down
6 changes: 4 additions & 2 deletions packages/network/src/workers/SyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
const cacheBlockNumber = await getIndexDBCacheStoreBlockNumber(indexDbCache);
const snapshotBlockNumber = await getSnapshotBlockNumber(snapshotClient, worldContract.address);
console.log(
`[SyncWorker] cache block: ${cacheBlockNumber}, snapshot block: ${snapshotBlockNumber}, start sync at ${initialBlockNumber}`
`[SyncWorker] cache block: ${cacheBlockNumber}, snapshot block: ${
snapshotBlockNumber > 0 ? snapshotBlockNumber : "Unavailable"
}, start sync at ${initialBlockNumber}`
);
let initialState = createCacheStore();
if (initialBlockNumber > Math.max(cacheBlockNumber, snapshotBlockNumber)) {
Expand Down Expand Up @@ -184,7 +186,7 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
);

console.log(
`[SyncWorker] got ${gapStateEvents.length} items from block range ${initialState.blockNumber} -> ${streamStartBlockNumber}`
`[SyncWorker || via JSON-RPC] got ${gapStateEvents.length} items from block range ${initialState.blockNumber} -> ${streamStartBlockNumber}`
);

// Merge initial state, gap state and live events since initial sync started
Expand Down
Loading

0 comments on commit 5bf9eb8

Please sign in to comment.