Skip to content

Commit

Permalink
Merge pull request #46 from earthstar-project/peer-syncing
Browse files Browse the repository at this point in the history
more organized rewrite of peer client and server
  • Loading branch information
cinnamon-bun committed May 13, 2021
2 parents fd5a3e6 + 8ae5b24 commit 83c4155
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 168 deletions.
197 changes: 79 additions & 118 deletions src/peer/peer-client.ts
Original file line number Diff line number Diff line change
@@ -1,154 +1,115 @@
import { WorkspaceAddress } from '../util/doc-types';
import { ICrypto } from '../crypto/crypto-types';
import { StorageId } from '../storage/storage-types';
import {
CommonWorkspacesAndPeerId,
IPeer,
IPeerClient,
IPeerServer,
PeerId,
SaltyHandshake_Outcome,
SaltyHandshake_Request,
SaltyHandshake_Response,
saltAndHashWorkspace,
} from './peer-types';

import { microsecondNow } from '../util/misc';
import { sortedInPlace } from '../storage/compare';

//--------------------------------------------------

import { Logger } from '../util/log';
import { workspaceAddressChars } from '../core-validators/characters';
import { microsecondNow } from '../util/misc';
let logger = new Logger('peer client', 'greenBright');
let loggerDo = new Logger('peer client: do', 'green');
let loggerProcess = new Logger('peer client: process', 'cyan');
let loggerUpdate = new Logger('peer client: update', 'blue');
let J = JSON.stringify;

//================================================================================

interface PeerInfo {
peerId: PeerId,
lastConnectedTimestamp: number,
commonWorkspaces: WorkspaceAddress[],
// TODO: we might want to access this by storageId, by workspace, or by peer + workspace...
storageInfos: Map<StorageId, StorageInfo>,
// Data we learn from talking to the server.
// Null means not known yet.
// This should be easily serializable.
interface ClientState {
serverPeerId: PeerId | null;
commonWorkspaces: WorkspaceAddress[] | null;
lastSeenAt: number | null, // a timestamp in Earthstar-style microseconds
}

interface StorageInfo {
storageId: StorageId,
peerId: PeerId,
workspace: WorkspaceAddress,
maxLocalIndexReceived: number,
maxLocalIndexSent: number,
// TODO: how to find out what the server wants from us?
let defaultClientState: ClientState = {
serverPeerId: null,
commonWorkspaces: null,
lastSeenAt: null,
}

let _defaultPeerInfo = (peerId: PeerId): PeerInfo => ({
peerId,
lastConnectedTimestamp: microsecondNow(),
commonWorkspaces: [],
storageInfos: new Map<StorageId, StorageInfo>(),
});

export class PeerClient implements IPeerClient {
// remember some things about each peer we've talked to
peerInfos: Map<PeerId, PeerInfo>;

constructor(public peer: IPeer, public crypto: ICrypto) {
this.peerInfos = new Map<PeerId, PeerInfo>();
}

async syncWithPeer(server: IPeerServer): Promise<void> {
logger.debug('sync');
let { commonWorkspaces, serverPeerId } =
await this.discoverCommonWorkspacesAndServerPeerId(server);
logger.debug(`...sync: got ${commonWorkspaces.length} common workspaces`);

// TODO: request info about storages we have in common

let peerInfo = this.peerInfos.get(serverPeerId) ?? _defaultPeerInfo(serverPeerId);

for (let workspace of commonWorkspaces) {
logger.debug(`...sync: doing workspace "${workspace}"`);
let storageInfo = peerInfo.storageInfos.get(workspace);
if (storageInfo === undefined) {
logger.debug(`...sync: ...we have not synced this workspace before`);
// TODO: get details of other storage: its storageId and maxLocalIndex
// so we can fill out a storageInfo for it
} else {
logger.debug(`...sync: ...we HAVE synced this workspace before:`);
logger.debug(storageInfo);
}
}

/*
for (let storageInfo of peerInfo.storageInfos.values()) {
if (commonWorkspaces.indexOf(storageInfo.workspace) === -1) {
// skip this storage, it's not a workspace we have in common
continue;
}
logger.debug(`...sync: doing workspace "${storageInfo.workspace}"`);
logger.debug(`...sync: so far, max local index received = ${storageInfo.maxLocalIndexReceived}`);
logger.debug(`...sync: so far, max local index sent = ${storageInfo.maxLocalIndexSent}`);
}
*/

/*
for (let workspace of commonWorkspaces) {
logger.debug(`...sync: doing workspace "${workspace}"`);
logger.debug(`...(TODO)`);
// TODO: getDocuments(server, maxLocalIndex) and ingest them here
// TODO: push documents from us to the server (which docs?)
}
*/

logger.debug('...sync: done');
crypto: ICrypto;
peer: IPeer;
server: IPeerServer;

state: ClientState = { ...defaultClientState };

// Each client only talks to one server.
constructor(crypto: ICrypto, peer: IPeer, server: IPeerServer) {
// TODO: load / save the client state (to where?)

logger.debug('peerClient constructor');
this.crypto = crypto;
this.peer = peer;
this.server = server;
logger.debug(`...peerId: ${this.peer.peerId}`);
logger.debug(`...client state:`);
logger.debug(this.state);
}

async syncWorkspace(server: IPeerServer): Promise<void> {
// do the entire thing
async do_saltyHandshake(): Promise<void> {
loggerDo.debug('do_saltyHandshake...');
let request: SaltyHandshake_Request = {};
loggerDo.debug('...asking server to serve_ ...');
let response = await this.server.serve_saltyHandshake(request);
loggerDo.debug('...client is going to process_ ...');
let outcome = await this.process_saltyHandshake(response);
loggerDo.debug('...client is going to update_ ...');
await this.update_saltyHandshake(outcome);
loggerDo.debug('...do_saltyHandshake is done');
}

/**
* This is the first step in talking with a server, so we discover a couple of things:
* - common workspaces
* - server's peerId
*/
async discoverCommonWorkspacesAndServerPeerId(server: IPeerServer): Promise<CommonWorkspacesAndPeerId> {
logger.debug(`discoverCommonWorkspaces`);

// talk to server. get peerId and salted workspaces
let {
peerId: serverPeerId,
salt,
saltedWorkspaces: serverSaltedWorkspaces
} = await server.saltedWorkspaces();
// this does any computation or complex work needed to boil this down
// into a simple state update, but it does not actually update our state,
// it just returns the changes to the state
async process_saltyHandshake(res: SaltyHandshake_Response): Promise<SaltyHandshake_Outcome> {
loggerProcess.debug('process_saltyHandshake...');

// figure out which workspaces we have in common
let commonWorkspacesSet = new Set<string>();
let serverSaltedSet = new Set<string>(serverSaltedWorkspaces);
for (let myWorkspace of this.peer.workspaces()) {
let mySalted = saltAndHashWorkspace(this.crypto, salt, myWorkspace);
if (serverSaltedSet.has(mySalted)) {
commonWorkspacesSet.add(myWorkspace);
// by salting and hashing our own workspaces in the same way
// the server did, and seeing what matches
let serverSaltedSet = new Set<string>(res.saltedWorkspaces);
let commonWorkspaceSet = new Set<WorkspaceAddress>();
for (let plainWs of this.peer.workspaces()) {
let saltedWs = saltAndHashWorkspace(this.crypto, res.salt, plainWs);
if (serverSaltedSet.has(saltedWs)) {
commonWorkspaceSet.add(plainWs);
}
}
let commonWorkspaces = sortedInPlace([...commonWorkspacesSet]);

// remember some facts about this server
logger.debug('server details before:', this.peerInfos.get(serverPeerId));
// load existing peerInfo
let peerInfo: PeerInfo = this.peerInfos.get(serverPeerId) ?? _defaultPeerInfo(serverPeerId);
// update peerInfo
peerInfo = {
...peerInfo,
lastConnectedTimestamp: microsecondNow(),
commonWorkspaces: commonWorkspaces,
}
this.peerInfos.set(serverPeerId, peerInfo);
logger.debug('server details after:', peerInfo);
let commonWorkspaces = sortedInPlace([...commonWorkspaceSet]);

logger.debug(`...${commonWorkspaces.length} common workspaces: ${J(commonWorkspaces)}`);
return {
let outcome: SaltyHandshake_Outcome = {
serverPeerId: res.serverPeerId,
commonWorkspaces,
serverPeerId,
}
};
loggerProcess.debug('...process_saltyHandshake is done:');
loggerProcess.debug(outcome);
return outcome;
}

// this applies the changes to the state
async update_saltyHandshake(outcome: SaltyHandshake_Outcome): Promise<void> {
loggerUpdate.debug('update_saltyHandshake...');
this.state.serverPeerId = outcome.serverPeerId;
this.state.commonWorkspaces = outcome.commonWorkspaces;
this.state.lastSeenAt = Math.max(
this.state.lastSeenAt ?? 0,
microsecondNow(),
);
loggerUpdate.debug('...update_saltyHandshake is done. client state is:');
loggerUpdate.debug(this.state);
}
}
37 changes: 25 additions & 12 deletions src/peer/peer-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,45 @@ import { ICrypto } from '../crypto/crypto-types';
import {
IPeer,
IPeerServer,
SaltAndSaltedWorkspaces,
SaltyHandshake_Request,
SaltyHandshake_Response,
saltAndHashWorkspace,
} from './peer-types';
} from "./peer-types";
import { randomId } from '../util/misc';

//--------------------------------------------------

import { Logger } from '../util/log';
let logger = new Logger('peer server', 'yellowBright');
let logger = new Logger('peer server', 'magentaBright');
let loggerServe = new Logger('peer server: serve', 'magenta');
let J = JSON.stringify;

//================================================================================

export class PeerServer implements IPeerServer {
constructor(public peer: IPeer, public crypto: ICrypto) {
crypto: ICrypto;
peer: IPeer;
constructor(crypto: ICrypto, peer: IPeer) {
logger.debug('peerServer constructor');
this.crypto = crypto;
this.peer = peer;
logger.debug(`...peerId: ${this.peer.peerId}`);
}
async saltedWorkspaces(): Promise<SaltAndSaltedWorkspaces> {
let salt = 'salt-' + randomId();
let saltedWorkspaces = this.peer.workspaces()
.map(ws => saltAndHashWorkspace(this.crypto, salt, ws));
let result = {
peerId: this.peer.peerId,
// this does not affect any internal state, in fact
// the server has no internal state (except maybe for
// rate limiting, etc)
async serve_saltyHandshake(req: SaltyHandshake_Request): Promise<SaltyHandshake_Response> {
loggerServe.debug('serve_saltyHandshake...');
let salt = randomId();
let saltedWorkspaces = this.peer.workspaces().map(ws =>
saltAndHashWorkspace(this.crypto, salt, ws));
let response: SaltyHandshake_Response = {
serverPeerId: this.peer.peerId,
salt,
saltedWorkspaces,
}
logger.debug(`saltedWorkspaces: ${J(result, null, 2)}`);
return result;
loggerServe.debug('...serve_saltyHandshake is done:');
loggerServe.debug(response);
return response;
}
}
71 changes: 57 additions & 14 deletions src/peer/peer-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,72 @@ export interface IPeer {
//================================================================================
// CLIENT AND SERVER

export interface SaltAndSaltedWorkspaces {
peerId: PeerId,
/**
* Every kind of API endpoint follows the same pattern here:
* - Client always initiates contact.
* - client.do_thing(thing_request) => void
* - client asks for server.serve_thing(thing_request) => thing_response
* - client.process_thing(thing_response) => thing_outcome
* - client.update_thing(thing_outcome) => void
*
* FUNCTION DATA TYPE
*
* x_request
* client.do_x
* server.serve_x
* x_response
* client.process_x
* x_outcome
* client.update_x
* void
*
*/

// ok this isn't a type, but I put it here anyway since it's shared code for client and server
export let saltAndHashWorkspace = (crypto: ICrypto, salt: string, workspace: WorkspaceAddress): string =>
crypto.sha256base32(salt + workspace + salt);

//--------------------------------------------------
// SALTY HANDSHAKE

export interface SaltyHandshake_Request {
}
export interface SaltyHandshake_Response {
serverPeerId: PeerId,
salt: string,
saltedWorkspaces: string[],
}

export interface CommonWorkspacesAndPeerId {
export interface SaltyHandshake_Outcome {
serverPeerId: PeerId
commonWorkspaces: WorkspaceAddress[],
serverPeerId: PeerId,
}

// ok this isn't a type, but I put it here anyway.
export let saltAndHashWorkspace = (crypto: ICrypto, salt: string, workspace: WorkspaceAddress): string =>
crypto.sha256base32(salt + workspace + salt);

// one server can talk to many clients.
//--------------------------------------------------

export interface IPeerClient {
syncWithPeer(server: IPeerServer): Promise<void>;
discoverCommonWorkspacesAndServerPeerId(server: IPeerServer): Promise<CommonWorkspacesAndPeerId>;
// Each client only talks to one server.

// do the entire thing
do_saltyHandshake(): Promise<void>;

// process and update are split into two functions
// for easier testing.

// this does any computation or complex work needed to boil this down
// into a simple state update, but it does not actually update our state,
// it just returns the changes to the state
process_saltyHandshake(res: SaltyHandshake_Response): Promise<SaltyHandshake_Outcome>;

// this applies the changes to the state
update_saltyHandshake(outcome: SaltyHandshake_Outcome): Promise<void>;

}

//--------------------------------------------------

export interface IPeerServer {
saltedWorkspaces(): Promise<SaltAndSaltedWorkspaces>
// storageDetails(workspaces): Promise<TODO>
// this does not affect any internal state, in fact
// the server has no internal state (except maybe for
// rate limiting, etc)
serve_saltyHandshake(req: SaltyHandshake_Request): Promise<SaltyHandshake_Response>;
}
Loading

0 comments on commit 83c4155

Please sign in to comment.