Skip to content
This repository was archived by the owner on Dec 27, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions modules/engine/src/merkleTree.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { ethers as eth } from 'ethers'

const { arrayify, concat, hexlify, isHexString, keccak256, padZeros } = eth.utils

const combinedHash = (first: string, second: string): string => {
if (!second) { return first }
if (!first) { return second }
return keccak256(concat([first, second].sort()))
}

export class MerkleTree {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure I see why this needs to be a class as opposed to just pure functions.

public elements: string[]
public root: string
public layers: string[][]

public constructor(_elements: string[]) {
if (!_elements.every((e: string): boolean => isHexString(e) && arrayify(e).length === 32)) {
throw new Error('Each element must be a 32 byte hex string')
}

// deduplicate elements
this.elements = _elements.filter((element: string, i: number): boolean =>
_elements.findIndex((e: string): boolean => element === e) === i,
).sort()

// Can't have an odd number of leaves
if (this.elements.length % 2 !== 0) {
this.elements.push(eth.constants.HashZero)
}

// Build merkle tree layers
this.layers = []
// Set root to HashZero if given zero elements
if (this.elements.length === 0) {
this.layers.push([eth.constants.HashZero])
} else {
this.layers.push(this.elements)
while (this.topLayer.length > 1) {
this.layers.push(this.topLayer.reduce(
(layer: string[], element: string, index: number, arr: string[]): string[] =>
index % 2 ? layer : layer.concat([combinedHash(element, arr[index + 1])]),
[],
))
}
}

this.root = this.topLayer[0]
}

public get topLayer(): string[] {
return this.layers[this.layers.length -1]
}

public proof(element: string): string {
let index = this.elements.findIndex((e: string): boolean => e === element)
if (index === -1) { throw new Error('element not found in merkle tree') }
const proofArray = this.layers.reduce((proof: string[], layer: string[]): string[] => {
const pairIndex: number = index % 2 ? index - 1 : index + 1
if (pairIndex < layer.length) {
proof.push(layer[pairIndex])
}
index = Math.floor(index / 2)
return proof
}, [element])
return hexlify(concat(proofArray))
}

public verify(proof: string): boolean {
const proofArray: RegExpMatchArray = proof.substring(2).match(/.{64}/g) || []
if (!proofArray || proofArray.length * 64 !== proof.length -2) {
console.warn(`Invalid proof: expected a hex string describing n 32 byte chunks`)
return false
}
const proofs: string[] = proofArray.map((p: string): string => `0x${p.replace('0x', '')}`)
return this.root === (proofs.slice(1).reduce(combinedHash, proofs[0]))
}

}
168 changes: 115 additions & 53 deletions modules/engine/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,67 @@ import { BigNumber } from "ethers";
import { Evt } from "evt";

import { ChannelUpdateError } from "./errors";
import { ChannelUpdate, MultisigCommitment, ChannelState, IStoreService, IMessagingService, VectorMessage, VectorChannelMessage, VectorErrorMessage, UpdateType } from "./types";
import {
ChannelUpdate,
MultisigCommitment,
IStoreService,
IMessagingService,
VectorMessage,
VectorChannelMessage,
VectorErrorMessage,
UpdateType,
FullChannelState,
} from "./types";
import { delay, logger, isChannelMessage, isChannelState } from "./utils";
import { validate } from "./validate";
import { applyUpdate } from "./update";

// Function responsible for handling user-initated/outbound channel updates.
// These updates will be single signed, the function should dispatch the
// message to the counterparty, and resolve once the updated channel state
// These updates will be single signed, the function should dispatch the
// message to the counterparty, and resolve once the updated channel state
// has been persisted.
export async function outbound(
update: ChannelUpdate,
update: ChannelUpdate,
storeService: IStoreService,
messagingService: IMessagingService,
stateEvt: Evt<ChannelState>,
messagingService: IMessagingService,
stateEvt: Evt<ChannelState>,
errorEvt: Evt<ChannelUpdateError>,
): Promise<ChannelState> {
const storedChannel = await storeService.getChannelState(update.channelAddress);
if (!storedChannel) {
// NOTE: IFF creating a channel, the initial channel state should be
// created and saved using `generate` (i.e. before it gets to this
// created and saved using `generate` (i.e. before it gets to this
// function call)
throw new ChannelUpdateError(ChannelUpdateError.reasons.ChannelNotFound, update, storedChannel);
}
// Create a helper function that will create a function that properly
// sets up the promise handlers. The only time this promise should
// reject instead of resolve is if *sending* the message failed. In
// that case, this should be safe to retry on failure
const generatePromise = () => new Promise<ChannelState | ChannelUpdateError>((resolve, reject) => {
// If there is an error event corresponding to this channel and
// this nonce, reject the promise
errorEvt.pipe((e: ChannelUpdateError) => {
return e.update.nonce === update.nonce && e.update.channelAddress === e.update.channelAddress
})
.attachOnce((e: ChannelUpdateError) => resolve(e));

// If there is a channel update event corresponding to
// this channel update, resolve the promise
stateEvt.pipe((e: ChannelState) => {
return e.channelAddress === update.channelAddress && e.latestNonce === update.nonce
})
.attachOnce((e: ChannelState) => resolve(e));

// TODO: turn `update` into a DTO before sending?
// TODO: what if there is no latest update?
messagingService.send(update.counterpartyPublicIdentifier, { update, latestUpdate: storedChannel.latestUpdate }).catch(e => reject(e.message));
});
const generatePromise = () =>
new Promise<ChannelState | ChannelUpdateError>((resolve, reject) => {
// If there is an error event corresponding to this channel and
// this nonce, reject the promise
errorEvt
.pipe((e: ChannelUpdateError) => {
return e.update.nonce === update.nonce && e.update.channelAddress === e.update.channelAddress;
})
.attachOnce((e: ChannelUpdateError) => resolve(e));

// If there is a channel update event corresponding to
// this channel update, resolve the promise
stateEvt
.pipe((e: ChannelState) => {
return e.channelAddress === update.channelAddress && e.latestNonce === update.nonce;
})
.attachOnce((e: ChannelState) => resolve(e));

// TODO: turn `update` into a DTO before sending?
// TODO: what if there is no latest update?
messagingService
.send(update.counterpartyPublicIdentifier, { update, latestUpdate: storedChannel.latestUpdate })
.catch((e) => reject(e.message));
});

// Retry sending the message 5 times w/3s delay
const sendWithRetry = async () => {
Expand Down Expand Up @@ -88,18 +104,24 @@ export async function outbound(

// Make sure the update is the correct one
if (result.state.latestUpdate.nonce !== update.nonce) {
throw new ChannelUpdateError(ChannelUpdateError.reasons.StaleChannelNonce, update, storedChannel, { counterpartyLatestUpdate: result.state.latestUpdate });
throw new ChannelUpdateError(ChannelUpdateError.reasons.StaleChannelNonce, update, storedChannel, {
counterpartyLatestUpdate: result.state.latestUpdate,
});
}

// Apply the update, and retry the update
let newState: string | ChannelState;
try {
newState = await mergeUpdate(result.state.latestUpdate, storedChannel);
newState = await applyUpdate(result.state.latestUpdate, storedChannel);
} catch (e) {
newState = e.message;
}
if (typeof newState === "string") {
throw new ChannelUpdateError(ChannelUpdateError.reasons.MergeUpdateFailed, result.state.latestUpdate, storedChannel);
throw new ChannelUpdateError(
ChannelUpdateError.reasons.applyUpdateFailed,
result.state.latestUpdate,
storedChannel,
);
}

// Save the updated state before retrying the update
Expand All @@ -110,10 +132,13 @@ export async function outbound(
error = e.message;
}
if (error) {
throw new ChannelUpdateError(ChannelUpdateError.reasons.SaveChannelFailed, result.state.latestUpdate, storedChannel);
throw new ChannelUpdateError(
ChannelUpdateError.reasons.SaveChannelFailed,
result.state.latestUpdate,
storedChannel,
);
}


// Retry the update
const syncedResult = await sendWithRetry();
if (!isChannelState(syncedResult)) {
Expand All @@ -125,12 +150,12 @@ export async function outbound(
// This function is responsible for handling any inbound vector messages.
// This function is expected to handle errors and updates from a counterparty.
export async function inbound(
message: VectorMessage,
message: VectorMessage,
storeService: IStoreService,
messagingService: IMessagingService,
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
signer: any,
stateEvt: Evt<ChannelState>,
stateEvt: Evt<ChannelState>,
errorEvt: Evt<ChannelUpdateError>,
): Promise<void> {
// If the message is from us, ignore
Expand All @@ -151,7 +176,7 @@ export async function inbound(

// This function is responsible for handling any inbound state requests.
async function processChannelMessage(
message: VectorChannelMessage,
message: VectorChannelMessage,
storeService: IStoreService,
messagingService: IMessagingService,
signer: any,
Expand Down Expand Up @@ -183,7 +208,9 @@ async function processChannelMessage(
// being created for the first time. If this is the case, create an
// empty channel and continue through the function
if (requestedUpdate.type !== UpdateType.setup) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.ChannelNotFound, requestedUpdate, storedState));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.ChannelNotFound, requestedUpdate, storedState),
);
}
// Create an empty channel state
storedState = {
Expand Down Expand Up @@ -226,7 +253,12 @@ async function processChannelMessage(
// TODO: should also make sure that there are *2* signatures
await counterpartyLatestUpdate.commitment.assertSignatures();
} catch (e) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.BadSignatures, requestedUpdate, storedState, {counterpartyLatestUpdate, error: e.message}));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.BadSignatures, requestedUpdate, storedState, {
counterpartyLatestUpdate,
error: e.message,
}),
);
}

// Get the difference between the stored and received nonces
Expand All @@ -238,13 +270,21 @@ async function processChannelMessage(
// use the information from this error to sync, then retry your update

// FIXME: We don't need to pass everything over the wire here, fix that
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.StaleUpdateNonce, requestedUpdate, storedState, { counterpartyLatestUpdate }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.StaleUpdateNonce, requestedUpdate, storedState, {
counterpartyLatestUpdate,
}),
);
}

// If we are behind by more than 3, we cannot sync from their latest
// update, and must use restore
if (diff.gte(3)) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.StaleChannelNonce, requestedUpdate, storedState, { counterpartyLatestUpdate }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.StaleChannelNonce, requestedUpdate, storedState, {
counterpartyLatestUpdate,
}),
);
}

// If the update nonce is ahead of the store nonce by 2, we are
Expand All @@ -256,25 +296,43 @@ async function processChannelMessage(
// Create the proper state to play the update on top of using the
// latest update
if (!counterpartyLatestUpdate) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.StaleChannelNonceNoUpdate, counterpartyLatestUpdate, storedState, { requestedUpdate }));
return handleError(
new ChannelUpdateError(
ChannelUpdateError.reasons.StaleChannelNonceNoUpdate,
counterpartyLatestUpdate,
storedState,
{ requestedUpdate },
),
);
}
try {
previousState = await mergeUpdate(counterpartyLatestUpdate, storedState);
previousState = await applyUpdate(counterpartyLatestUpdate, storedState);
} catch (e) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.MergeUpdateFailed, counterpartyLatestUpdate, storedState, { requestedUpdate, error: e.message, stack: e.stack }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.applyUpdateFailed, counterpartyLatestUpdate, storedState, {
requestedUpdate,
error: e.message,
stack: e.stack,
}),
);
}
}

// We now have the latest state for the update, and should be
// able to play it on top of the update
let response: ChannelState | string;
try {
response = await mergeUpdate(requestedUpdate, previousState);
response = await applyUpdate(requestedUpdate, previousState);
} catch (e) {
response = e.message;
}
if (typeof response === "string") {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.MergeUpdateFailed, requestedUpdate, previousState, { counterpartyLatestUpdate, error: response }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.applyUpdateFailed, requestedUpdate, previousState, {
counterpartyLatestUpdate,
error: response,
}),
);
}

// If the update was single signed, the counterparty is proposing
Expand All @@ -287,11 +345,18 @@ async function processChannelMessage(
signed = requestedUpdate.commitment.addSignature(sig);
await storeService.saveChannelState(response);
} catch (e) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.SaveChannelFailed, requestedUpdate, previousState, { error: e.message }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.SaveChannelFailed, requestedUpdate, previousState, {
error: e.message,
}),
);
}

// Send the latest update to the node
await messagingService.send(from, { update: {...requestedUpdate, commitment: signed}, latestUpdate: response.latestUpdate });
await messagingService.send(from, {
update: { ...requestedUpdate, commitment: signed },
latestUpdate: response.latestUpdate,
});
return;
}

Expand All @@ -300,14 +365,11 @@ async function processChannelMessage(
try {
await storeService.saveChannelState(response);
} catch (e) {
return handleError(new ChannelUpdateError(ChannelUpdateError.reasons.SaveChannelFailed, requestedUpdate, previousState, { error: e.message }));
return handleError(
new ChannelUpdateError(ChannelUpdateError.reasons.SaveChannelFailed, requestedUpdate, previousState, {
error: e.message,
}),
);
}
stateEvt.post(response);
}

// Creates a new state from the given update
async function mergeUpdate(update: ChannelUpdate, state: ChannelState): Promise<ChannelState> {
// TODO should this just exist in the store?
await validate(update, state);
throw new Error("Method not implemented");
}
Loading