Skip to content

Commit

Permalink
feat: refactored hole punch signalling procedure
Browse files Browse the repository at this point in the history
* general refactor of the signalling protocol.
* Added signatures and verification to the signalling requests and relay messages. #148
  • Loading branch information
tegefaulkes committed Oct 24, 2023
1 parent 5b01c9e commit 9cd245a
Show file tree
Hide file tree
Showing 21 changed files with 1,207 additions and 704 deletions.
245 changes: 143 additions & 102 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { LockRequest } from '@matrixai/async-locks';
import type { ResourceAcquire } from '@matrixai/resources';
import type { ContextTimedInput, ContextTimed } from '@matrixai/contexts';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { ClientCryptoOps, QUICConnection } from '@matrixai/quic';
import type NodeGraph from './NodeGraph';
import type {
Expand All @@ -21,15 +20,15 @@ import type {
TLSConfig,
} from '../network/types';
import type { ServerManifest } from '@matrixai/rpc';
import type { HolePunchRelayMessage } from './agent/types';
import Logger from '@matrixai/logger';
import { withF } from '@matrixai/resources';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
import { IdInternal } from '@matrixai/id';
import { Lock, LockBox } from '@matrixai/async-locks';
import { Lock, LockBox, Semaphore } from '@matrixai/async-locks';
import { Timer } from '@matrixai/timer';
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
import { AbstractEvent, EventAll } from '@matrixai/events';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import {
QUICSocket,
QUICServer,
Expand All @@ -43,11 +42,11 @@ import * as nodesUtils from './utils';
import * as nodesErrors from './errors';
import * as nodesEvents from './events';
import manifestClientAgent from './agent/callers';
import * as ids from '../ids';
import * as keysUtils from '../keys/utils';
import * as networkUtils from '../network/utils';
import * as utils from '../utils';
import config from '../config';
import RateLimiter from '../rateLimiter/RateLimiter';

type ManifestClientAgent = typeof manifestClientAgent;

Expand Down Expand Up @@ -129,6 +128,22 @@ class NodeConnectionManager {
* Default timeout for RPC handlers
*/
public readonly rpcCallTimeoutTime: number;
/**
* Used to track active hole punching attempts
*/
protected activePunchMap = new Map<string, PromiseCancellable<void>>();
/**
* Used to rate limit punch attempts per IP Address
*/
protected activeAddressMap = new Map<string, Semaphore>();
/**
* Used track active signalling attempts
*/
protected activeSignalSet = new Set<PromiseCancellable<void>>();
/**
* Used to limit signalling requests on a per requester basis
*/
protected rateLimiter = new RateLimiter(60000, 20, 10, 1);

protected logger: Logger;
protected keyRing: KeyRing;
Expand Down Expand Up @@ -456,11 +471,13 @@ class NodeConnectionManager {
this.handleEventQUICServerConnection,
);
this.quicSocket.addEventListener(EventAll.name, this.handleEventAll);
this.rateLimiter.startRefillInterval();
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop() {
this.logger.info(`Stop ${this.constructor.name}`);
this.rateLimiter.stop();

this.removeEventListener(
nodesEvents.EventNodeConnectionManagerError.name,
Expand Down Expand Up @@ -503,6 +520,16 @@ class NodeConnectionManager {
destroyProms.push(destroyProm);
}
await Promise.all(destroyProms);
const signallingProms: Array<PromiseCancellable<void>> = [];
for (const [, activePunch] of this.activePunchMap) {
signallingProms.push(activePunch);
activePunch.cancel();
}
for (const activeSignal of this.activeSignalSet) {
signallingProms.push(activeSignal);
activeSignal.cancel();
}
await Promise.allSettled(signallingProms);
await this.quicServer.stop({ force: true });
await this.quicSocket.stop({ force: true });
await this.rpcServer.stop({ force: true });
Expand Down Expand Up @@ -904,12 +931,7 @@ class NodeConnectionManager {
// 3. if already exists then clean up
await connection.destroy({ force: true });
// I can only see this happening as a race condition with creating a forward connection and receiving a reverse.
// FIXME: only here to see if this condition happens.
// this NEEDS to be removed, but I want to know if this branch happens at all.
throw Error(
'TMP IMP, This should be exceedingly rare, lets see if it happens',
);
// Return;
return;
}
// Final setup
const newConnAndTimer = this.addConnection(nodeId, connection);
Expand Down Expand Up @@ -1074,11 +1096,11 @@ class NodeConnectionManager {
* @param port port of the target client.
* @param ctx
*/
public async holePunchReverse(
public holePunchReverse(
host: Host,
port: Port,
ctx?: Partial<ContextTimed>,
): Promise<void>;
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(
true,
Expand Down Expand Up @@ -1378,106 +1400,129 @@ class NodeConnectionManager {
}

/**
* Performs an RPC request to send a hole-punch message to the target. Used to
* initially establish the NodeConnection from source to target.
* This is used by the `nodesHolePunchRequestHandler` to initiate the hole punch procedure.
*
* @param relayNodeId node ID of the relay node (i.e. the seed node)
* @param sourceNodeId node ID of the current node (i.e. the sender)
* @param targetNodeId node ID of the target node to hole punch
* @param address
* @param ctx
* Will validate the message, and initiate hole punching in the background and return immediately
*/
public sendSignalingMessage(
relayNodeId: NodeId,
sourceNodeId: NodeId,
targetNodeId: NodeId,
address?: NodeAddress,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async sendSignalingMessage(
relayNodeId: NodeId,
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public handleNodesConnectionSignalFinal(host: Host, port: Port) {
const id = `${host}:${port}`;
if (this.activePunchMap.has(id)) return;
// Checking for resource semaphore
let semaphore: Semaphore | undefined = this.activeAddressMap.get(host);
if (semaphore == null) {
semaphore = new Semaphore(3);
this.activeAddressMap.set(host, semaphore);
}
const holePunchAttempt = new PromiseCancellable<void>(
async (res, rej, signal) => {
await semaphore!.withF(async () => {
this.holePunchReverse(host, port, { signal })
.finally(() => {
this.activePunchMap.delete(id);
if (semaphore!.count === 0) {
this.activeAddressMap.delete(host);
}
})
.then(res, rej);
});
},
);
this.activePunchMap.set(id, holePunchAttempt);
}

/**
* The handler used by the RPC to process signalling requests
* @param sourceNodeId - NodeId of the node making the request. Used for rate limiting.
* @param targetNodeId - NodeId of the node that needs to initiate hole punching.
* @param address - Address the target needs to punch to.
* @param requestSignature - `base64url` encoded signature
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public handleNodesConnectionSignalInitial(
sourceNodeId: NodeId,
targetNodeId: NodeId,
address: NodeAddress | undefined,
@context ctx: ContextTimed,
): Promise<void> {
if (
this.keyRing.getNodeId().equals(relayNodeId) ||
this.keyRing.getNodeId().equals(targetNodeId)
) {
// Logging and silently dropping operation
this.logger.debug(
'Attempted to send signaling message to our own NodeId',
);
return;
address: NodeAddress,
requestSignature: string,
) {
// Need to get the connection details of the requester and add it to the message.
// Then send the message to the target.
// This would only function with existing connections
if (!this.hasConnection(targetNodeId)) {
throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound();
}
const rlyNode = nodesUtils.encodeNodeId(relayNodeId);
const srcNode = nodesUtils.encodeNodeId(sourceNodeId);
const tgtNode = nodesUtils.encodeNodeId(targetNodeId);
const addressString =
address != null ? `, address: ${address.host}:${address.port}` : '';
this.logger.debug(
`sendSignalingMessage sending Signaling message relay: ${rlyNode}, source: ${srcNode}, target: ${tgtNode}${addressString}`,
// Do other checks.
const sourceNodeIdString = sourceNodeId.toString();
if (!this.rateLimiter.consume(sourceNodeIdString)) {
throw new nodesErrors.ErrorNodeConnectionManagerRequestRateExceeded();
}
// Generating relay signature, data is just `<sourceNodeId><targetNodeId><Address><requestSignature>` concatenated
const data = Buffer.concat([
sourceNodeId,
targetNodeId,
Buffer.from(JSON.stringify(address), 'utf-8'),
Buffer.from(requestSignature, 'base64url'),
]);
const relaySignature = keysUtils.signWithPrivateKey(
this.keyRing.keyPair,
data,
);
// Send message and ignore any error
await this.withConnF(
relayNodeId,
async (connection) => {
const client = connection.getClient();
await client.methods.nodesHolePunchMessageSend(
{
srcIdEncoded: srcNode,
dstIdEncoded: tgtNode,
address,
},
ctx,
);
},
ctx,
).catch(() => {});
const connProm = this.withConnF(targetNodeId, async (conn) => {
const client = conn.getClient();
await client.methods.nodesConnectionSignalFinal({
sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId),
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
address,
requestSignature: requestSignature,
relaySignature: relaySignature.toString('base64url'),
});
}).finally(() => {
this.activeSignalSet.delete(connProm);
});
this.activeSignalSet.add(connProm);
}

/**
* Forwards a received hole punch message on to the target.
* If not known, the node ID -> address mapping is attempted to be discovered
* through Kademlia (note, however, this is currently only called by a 'broker'
* node).
* @param message the original relay message (assumed to be created in
* nodeConnection.start())
* @param sourceAddress
* This till ask a signalling node to signal a target node to hole punch back to this node.
* @param targetNodeId - NodeId of the node that needs to signal back.
* @param signallingNodeId - NodeId of the signalling node.
* @param ctx
*/
public relaySignalingMessage(
message: HolePunchRelayMessage,
sourceAddress: NodeAddress,
ctx?: Partial<ContextTimed>,
public holePunchSignalRequest(
targetNodeId: NodeId,
signallingNodeId: NodeId,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
@timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async relaySignalingMessage(
message: HolePunchRelayMessage,
sourceAddress: NodeAddress,
public async holePunchSignalRequest(
targetNodeId: NodeId,
signallingNodeId: NodeId,
@context ctx: ContextTimed,
): Promise<void> {
// First check if we already have an existing ID -> address record
// If we're relaying then we trust our own node graph records over
// what was provided in the message
const sourceNode = ids.parseNodeId(message.srcIdEncoded);
await this.sendSignalingMessage(
ids.parseNodeId(message.dstIdEncoded),
sourceNode,
ids.parseNodeId(message.dstIdEncoded),
sourceAddress,
await this.withConnF(
signallingNodeId,
async (conn) => {
const client = conn.getClient();
const sourceNodeId = this.keyRing.getNodeId();
// Data is just `<sourceNodeId><targetNodeId>` concatenated
const data = Buffer.concat([sourceNodeId, targetNodeId]);
const signature = keysUtils.signWithPrivateKey(
this.keyRing.keyPair,
data,
);
await client.methods.nodesConnectionSignalInitial(
{
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
signature: signature.toString('base64url'),
},
ctx,
);
},
ctx,
);
}
Expand Down Expand Up @@ -1657,19 +1702,15 @@ class NodeConnectionManager {
const allProms: Array<Promise<Array<void>>> = [];
for (const targetNodeId of targetNodeIds) {
if (!this.isSeedNode(targetNodeId)) {
// Ask seed nodes to signal hole punching for target
const holePunchProms = seedNodes.map((seedNodeId) => {
return (
this.sendSignalingMessage(
seedNodeId,
this.keyRing.getNodeId(),
targetNodeId,
undefined,
ctx,
)
this.holePunchSignalRequest(targetNodeId, seedNodeId, ctx)
// Ignore results
.then(
() => {},
() => {},
(e) =>
this.logger.debug(`signal request failed with ${e.message}`),
)
);
});
Expand Down
9 changes: 6 additions & 3 deletions src/nodes/agent/callers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import nodesClaimsGet from './nodesClaimsGet';
import nodesClosestLocalNodesGet from './nodesClosestLocalNodesGet';
import nodesConnectionSignalFinal from './nodesConnectionSignalFinal';
import nodesConnectionSignalInitial from './nodesConnectionSignalInitial';
import nodesCrossSignClaim from './nodesCrossSignClaim';
import nodesHolePunchMessageSend from './nodesHolePunchMessageSend';
import notificationsSend from './notificationsSend';
import vaultsGitInfoGet from './vaultsGitInfoGet';
import vaultsGitPackGet from './vaultsGitPackGet';
Expand All @@ -13,8 +14,9 @@ import vaultsScan from './vaultsScan';
const manifestClient = {
nodesClaimsGet,
nodesClosestLocalNodesGet,
nodesConnectionSignalFinal,
nodesConnectionSignalInitial,
nodesCrossSignClaim,
nodesHolePunchMessageSend,
notificationsSend,
vaultsGitInfoGet,
vaultsGitPackGet,
Expand All @@ -26,8 +28,9 @@ export default manifestClient;
export {
nodesClaimsGet,
nodesClosestLocalNodesGet,
nodesConnectionSignalFinal,
nodesConnectionSignalInitial,
nodesCrossSignClaim,
nodesHolePunchMessageSend,
notificationsSend,
vaultsGitInfoGet,
vaultsGitPackGet,
Expand Down
Loading

0 comments on commit 9cd245a

Please sign in to comment.