Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connection with ICE now gets target port from signalling node #624

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
QUICSocket,
ClientCryptoOps,
QUICConnection,
Host as QUICHost,
} from '@matrixai/quic';
import type { ContextTimedInput } from '@matrixai/contexts/dist/types';
import type { X509Certificate } from '@peculiar/x509';
Expand All @@ -20,6 +21,7 @@ import {
QUICClient,
events as quicEvents,
errors as quicErrors,
utils as quicUtils,
} from '@matrixai/quic';
import { RPCClient } from '@matrixai/rpc';
import { middleware as rpcUtilsMiddleware } from '@matrixai/rpc';
Expand Down Expand Up @@ -464,9 +466,11 @@ class NodeConnection<M extends ClientManifest> {
}) {
this.validatedNodeId = validatedNodeId;
this.nodeId = nodeId;
this.host = host;
this.host = quicUtils.toCanonicalIP(host) as unknown as Host;
this.port = port;
this.localHost = localHost;
this.localHost = quicUtils.resolvesZeroIP(
localHost as unknown as QUICHost,
) as unknown as Host;
this.localPort = localPort;
this.certChain = certChain;
this.hostname = hostname;
Expand Down
92 changes: 54 additions & 38 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1028,9 +1028,19 @@ class NodeConnectionManager {
this.logger.debug(
`establishing single connection for address ${address.host}:${address.port}`,
);
const iceProm = address.scopes.includes('global')
? this.initiateHolePunch(nodeIds, ctx)
: undefined;
if (address.scopes.includes('global')) {
// Get updated address from ice procedure, using first result for now
const result = await this.initiateHolePunch(nodeIds, ctx);
for (const newAddress of result) {
if (newAddress != null) {
this.logger.debug(
`initiateHolePunch returned new ${newAddress.host}:${newAddress.port} vs old ${address.host}:${address.port}`,
);
address.host = newAddress.host as Host;
address.port = newAddress.port;
}
}
}
const connection =
await NodeConnection.createNodeConnection<ManifestClientAgent>(
{
Expand All @@ -1048,25 +1058,19 @@ class NodeConnectionManager {
),
},
ctx,
)
.catch((e) => {
this.logger.debug(
`establish single connection failed for ${address.host}:${address.port} with ${e.message}`,
);
throw e;
})
.finally(async () => {
iceProm?.cancel('Connection was established');
await iceProm;
});
).catch((e) => {
this.logger.debug(
`establish single connection failed for ${address.host}:${address.port} with ${e.message}`,
);
throw e;
});
// 2. if established then add to result map
const nodeId = connection.nodeId;
const nodeIdString = nodeId.toString() as NodeIdString;
if (connectionsResults.has(nodeIdString)) {
this.logger.debug(
`single connection already existed, cleaning up ${address.host}:${address.port}`,
);
throw Error('haha error go brrrr');
// 3. if already exists then clean up
await connection.destroy({ force: true });
// I can only see this happening as a race condition with creating a forward connection and receiving a reverse.
Expand Down Expand Up @@ -1798,18 +1802,21 @@ class NodeConnectionManager {
* @param requestSignature - `base64url` encoded signature
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public handleNodesConnectionSignalInitial(
public async handleNodesConnectionSignalInitial(
sourceNodeId: NodeId,
targetNodeId: NodeId,
address: NodeAddress,
requestSignature: string,
) {
): Promise<NodeAddress> {
// Need to get the connection details of the requester and add it to the message.
// Then send the message to the target.
// This would only function with existing connections
if (!this.hasConnection(targetNodeId)) {
const existingConnection = await this.getExistingConnection(targetNodeId);
if (existingConnection == null) {
throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound();
}
const host = existingConnection.connection.host;
const port = existingConnection.connection.port;
// Do other checks.
const sourceNodeIdString = sourceNodeId.toString();
if (!this.rateLimiter.consume(sourceNodeIdString)) {
Expand Down Expand Up @@ -1839,6 +1846,11 @@ class NodeConnectionManager {
this.activeSignalFinalPs.delete(connProm);
});
this.activeSignalFinalPs.add(connProm);
return {
host,
port,
scopes: ['global'],
};
}

/**
Expand All @@ -1858,7 +1870,7 @@ class NodeConnectionManager {
targetNodeId: NodeId,
signallingNodeId: NodeId,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
): PromiseCancellable<NodeAddress>;
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
@timedCancellable(
true,
Expand All @@ -1869,8 +1881,8 @@ class NodeConnectionManager {
targetNodeId: NodeId,
signallingNodeId: NodeId,
@context ctx: ContextTimed,
): Promise<void> {
await this.withConnF(
): Promise<NodeAddress> {
return await this.withConnF(
signallingNodeId,
async (conn) => {
const client = conn.getClient();
Expand All @@ -1881,13 +1893,20 @@ class NodeConnectionManager {
this.keyRing.keyPair,
data,
);
await client.methods.nodesConnectionSignalInitial(
{
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
signature: signature.toString('base64url'),
},
ctx,
);
const addressMessage =
await client.methods.nodesConnectionSignalInitial(
{
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
signature: signature.toString('base64url'),
},
ctx,
);
const nodeAddress: NodeAddress = {
host: addressMessage.host as Host,
port: addressMessage.port as Port,
scopes: ['global'],
};
return nodeAddress;
},
ctx,
);
Expand Down Expand Up @@ -2048,31 +2067,28 @@ class NodeConnectionManager {
protected initiateHolePunch(
targetNodeIds: Array<NodeId>,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
): PromiseCancellable<Array<NodeAddress | undefined>>;
@timedCancellable(true)
protected async initiateHolePunch(
targetNodeIds: Array<NodeId>,
@context ctx: ContextTimed,
): Promise<void> {
): Promise<Array<NodeAddress | undefined>> {
const seedNodes = this.getSeedNodes();
const allProms: Array<Promise<Array<void>>> = [];
const allProms: Array<PromiseCancellable<NodeAddress | undefined>> = [];
for (const targetNodeId of targetNodeIds) {
if (!this.isSeedNode(targetNodeId)) {
// Ask seed nodes to signal hole punching for target
const holePunchProms = seedNodes.map((seedNodeId) => {
return (
this.connectionSignalInitial(targetNodeId, seedNodeId, ctx)
// Ignore results
.then(
() => {},
() => {},
)
// Ignore errors
.catch(() => undefined)
);
});
allProms.push(Promise.all(holePunchProms));
allProms.push(...holePunchProms);
}
}
await Promise.all(allProms).catch();
return await Promise.all(allProms);
}
}

Expand Down
23 changes: 14 additions & 9 deletions src/nodes/agent/handlers/NodesConnectionSignalInitial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
AgentRPCRequestParams,
AgentRPCResponseResult,
HolePunchSignalMessage,
AddressMessage,
} from '../types';
import type NodeConnectionManager from '../../../nodes/NodeConnectionManager';
import type { Host, Port } from '../../../network/types';
Expand All @@ -19,13 +20,13 @@ class NodesConnectionSignalInitial extends UnaryHandler<
nodeConnectionManager: NodeConnectionManager;
},
AgentRPCRequestParams<HolePunchSignalMessage>,
AgentRPCResponseResult
AgentRPCResponseResult<AddressMessage>
> {
public handle = async (
input: AgentRPCRequestParams<HolePunchSignalMessage>,
_cancel,
meta: Record<string, JSONValue> | undefined,
): Promise<AgentRPCResponseResult> => {
): Promise<AgentRPCResponseResult<AddressMessage>> => {
const { nodeConnectionManager } = this.container;
// Connections should always be validated
const requestingNodeId = agentUtils.nodeIdFromMeta(meta);
Expand Down Expand Up @@ -54,13 +55,17 @@ class NodesConnectionSignalInitial extends UnaryHandler<
port: remotePort as Port,
scopes: ['global'],
};
nodeConnectionManager.handleNodesConnectionSignalInitial(
requestingNodeId,
targetNodeId,
address,
input.signature,
);
return {};
const targetAddress =
await nodeConnectionManager.handleNodesConnectionSignalInitial(
requestingNodeId,
targetNodeId,
address,
input.signature,
);
return {
host: targetAddress.host,
port: targetAddress.port,
};
};
}

Expand Down
1 change: 0 additions & 1 deletion tests/nodes/NodeConnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ describe(`${NodeConnection.name}`, () => {
logger: logger.getChild(`${QUICServer.name}`),
});
rpcServer = new RPCServer({
handlerTimeoutTime: 5000,
fromError: networkUtils.fromError,
logger: logger.getChild(`${RPCServer.name}`),
});
Expand Down
8 changes: 5 additions & 3 deletions tests/nodes/NodeConnectionManager.general.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,9 @@ describe(`${NodeConnectionManager.name} general test`, () => {
const targetNodeId = remotePolykeyAgentB.keyRing.getNodeId();
const data = Buffer.concat([sourceNodeId, targetNodeId]);
const signature = keysUtils.signWithPrivateKey(keyPair, data);
expect(() => {
await expect(async () => {
for (let i = 0; i < 30; i++) {
nodeConnectionManager.handleNodesConnectionSignalInitial(
await nodeConnectionManager.handleNodesConnectionSignalInitial(
sourceNodeId,
targetNodeId,
{
Expand All @@ -665,7 +665,9 @@ describe(`${NodeConnectionManager.name} general test`, () => {
signature.toString('base64url'),
);
}
}).toThrow(nodesErrors.ErrorNodeConnectionManagerRequestRateExceeded);
}).rejects.toThrow(
nodesErrors.ErrorNodeConnectionManagerRequestRateExceeded,
);

const signalMapA =
// @ts-ignore: kidnap protected property
Expand Down
1 change: 0 additions & 1 deletion tests/nodes/NodeConnectionManager.lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {

await nodeConnectionManager.stop();
});
// TODO: work in progress testing
test('concurrent connections should result in only 1 connection', async () => {
// A connection is concurrently established in the forward and reverse
// direction, we only want one connection to exist afterwards.
Expand Down
51 changes: 51 additions & 0 deletions tests/nodes/NodeConnectionManager.seednodes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -648,4 +648,55 @@ describe(`${NodeConnectionManager.name} seednodes test`, () => {

await nodeConnectionManager.stop();
});
test('simulating hole punching with a common node', async () => {
// We can't truly test this without a nat, so we're just going through the motions in this test.
// Will trigger signaling via the seed node, remotePolykeyAgent1 in this case.
const seedNodes = {
[remoteNodeIdEncoded1]: remoteAddress1,
};
nodeConnectionManager = new NodeConnectionManager({
keyRing,
logger: logger.getChild(NodeConnectionManager.name),
nodeGraph,
connectionConnectTimeoutTime: 1000,
tlsConfig,
seedNodes,
});
nodeManager = new NodeManager({
db,
gestaltGraph,
keyRing,
nodeConnectionManager,
nodeGraph,
sigchain,
taskManager,
connectionConnectTimeoutTime: 1000,
logger,
});
await nodeManager.start();
await nodeConnectionManager.start({
host: localHost as Host,
});

// Connect remote1 to remote2
const result1 = await remotePolykeyAgent1.nodeConnectionManager.pingNode(
remoteNodeId2,
[remoteAddress2],
);
expect(result1).toBeTrue();

// Establish connection to remotePolykeyAgent1
const result2 = await nodeConnectionManager.pingNode(remoteNodeId1, [
remoteAddress1,
]);
expect(result2).toBeTrue();

// Now we attempt to connect to remotePolykeyAgent2 with signalling
const result3 = await nodeConnectionManager.pingNode(remoteNodeId2, [
remoteAddress2,
]);
expect(result3).toBeTrue();
// Waiting for setNode to propagate
await utils.sleep(100);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ describe('nodesHolePunchSignal', () => {
// Data is just `<sourceNodeId><targetNodeId>` concatenated
const data = Buffer.concat([sourceNodeId, targetNodeId]);
const signature = keysUtils.signWithPrivateKey(keyPair, data);
dummyNodeConnectionManager.handleNodesConnectionSignalInitial.mockResolvedValue(
{
host: '127.0.0.1',
port: 55555,
scopes: ['global'],
},
);
await rpcClient.methods.nodesConnectionSignalInitial({
targetNodeIdEncoded,
signature: signature.toString('base64url'),
Expand Down