Skip to content

Commit

Permalink
feat(p2p): add reputation events (#470)
Browse files Browse the repository at this point in the history
This commit implements reputation events as described in issue #457.
Those events track the history of the behaviour of a node and are stored
in the database.
  • Loading branch information
michael1011 committed Sep 29, 2018
1 parent 3f6f11e commit 9f8ab32
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 22 deletions.
6 changes: 3 additions & 3 deletions lib/db/DB.ts
@@ -1,9 +1,7 @@
import fs from 'fs';
import path from 'path';
import assert from 'assert';
import Sequelize from 'sequelize';
import Bluebird from 'bluebird';

import Logger from '../Logger';
import { db } from '../types';
import { SwapClients } from '../types/enums';
Expand All @@ -12,6 +10,7 @@ type Models = {
Node: Sequelize.Model<db.NodeInstance, db.NodeAttributes>;
Currency: Sequelize.Model<db.CurrencyInstance, db.CurrencyAttributes>;
Pair: Sequelize.Model<db.PairInstance, db.PairAttributes>;
ReputationEvent: Sequelize.Model<db.ReputationEventInstance, db.ReputationEventAttributes>;
};

/** A class representing a connection to a SQL database. */
Expand Down Expand Up @@ -45,11 +44,12 @@ class DB {
this.logger.error('unable to connect to the database', err);
throw err;
}
const { Node, Currency, Pair } = this.models;
const { Node, Currency, Pair, ReputationEvent } = this.models;
// sync schemas with the database in phases, according to FKs dependencies
await Promise.all([
Node.sync(),
Currency.sync(),
ReputationEvent.sync(),
]);
await Promise.all([
Pair.sync(),
Expand Down
26 changes: 26 additions & 0 deletions lib/db/models/ReputationEvent.ts
@@ -0,0 +1,26 @@
import Sequelize from 'sequelize';
import { db } from '../../types';

export default (sequelize: Sequelize.Sequelize, DataTypes: Sequelize.DataTypes) => {
const attributes: db.SequelizeAttributes<db.ReputationEventAttributes> = {
id: { type: DataTypes.INTEGER, primaryKey: true, autoIncrement: true },
event: { type: DataTypes.INTEGER, allowNull: false },
nodeId: { type: DataTypes.INTEGER, allowNull: false },
};

const options: Sequelize.DefineOptions<db.ReputationEventInstance> = {
tableName: 'reputationEvents',
timestamps: true,
updatedAt: false,
};

const ReputationEvent = sequelize.define<db.ReputationEventInstance, db.ReputationEventAttributes>('ReputationEvent', attributes, options);

ReputationEvent.associate = (models: Sequelize.Models) => {
models.ReputationEvent.belongsTo(models.Node, {
foreignKey: 'nodeId',
});
};

return ReputationEvent;
};
92 changes: 80 additions & 12 deletions lib/p2p/NodeList.ts
@@ -1,17 +1,39 @@
import { EventEmitter } from 'events';
import P2PRepository from './P2PRepository';
import { NodeInstance, NodeFactory } from '../types/db';
import { Address } from '../types/p2p';
import addressUtils from '../utils/addressUtils';
import { ReputationEvent } from '../types/enums';

const reputationEventWeight = {
[ReputationEvent.ManualBan]: Number.NEGATIVE_INFINITY,
[ReputationEvent.ManualUnban]: 0,
[ReputationEvent.PacketTimeout]: -1,
[ReputationEvent.SwapFailure]: -10,
[ReputationEvent.SwapSuccess]: 1,
};

// TODO: inform node about getting banned
// TODO: remove reputation events after certain amount of time

interface NodeList {
on(event: 'node.ban', listener: (nodePubKey: string) => void): this;
emit(event: 'node.ban', nodePubKey: string): boolean;
}

/** Represents a list of nodes for managing network peers activity */
class NodeList {
class NodeList extends EventEmitter {
private nodes = new Map<string, NodeInstance>();

private static readonly banThreshold = -50;

public get count() {
return this.nodes.size;
}

constructor(private repository: P2PRepository) {}
constructor(private repository: P2PRepository) {
super();
}

/**
* Check if a node with a given nodePubKey exists.
Expand All @@ -29,13 +51,7 @@ class NodeList {
* @returns true if the node was banned, false otherwise
*/
public ban = async (nodePubKey: string): Promise<boolean> => {
const node = this.nodes.get(nodePubKey);
if (node) {
node.banned = true;
await node.save();
return true;
}
return false;
return this.addReputationEvent(nodePubKey, ReputationEvent.ManualBan);
}

public isBanned = (nodePubKey: string): boolean => {
Expand All @@ -49,8 +65,14 @@ class NodeList {
public load = async (): Promise<void> => {
const nodes = await this.repository.getNodes();

nodes.forEach((node) => {
this.nodes.set(node.nodePubKey, node);
nodes.forEach(async (node) => {
if (!node.banned) {
this.nodes.set(node.nodePubKey, node);
const events = await this.repository.getReputationEvents(node);
events.forEach(({ event }) => {
this.updateReputationScore(node, event);
});
}
});
}

Expand All @@ -67,7 +89,7 @@ class NodeList {
* Update a node's addresses.
* @return true if the specified node exists and was updated, false otherwise
*/
public updateAddresses = async (nodePubKey: string, addresses: Address[] = [], lastAddress?: Address) => {
public updateAddresses = async (nodePubKey: string, addresses: Address[] = [], lastAddress?: Address): Promise<boolean> => {
const node = this.nodes.get(nodePubKey);
if (node) {
// avoid overriding the `lastConnected` field for existing matching addresses unless a new value was set
Expand All @@ -91,6 +113,36 @@ class NodeList {
return false;
}

/**
* Add a reputation event to the node's history
* @return true if the specified node exists and the event was added, false otherwise
*/
public addReputationEvent = async (nodePubKey: string, event: ReputationEvent): Promise<boolean> => {
const node = this.nodes.get(nodePubKey);
if (node) {
const promises: PromiseLike<any>[] = [
this.repository.addReputationEvent({ event, nodeId: node.id }),
];

this.updateReputationScore(node, event);

if (node.reputationScore < NodeList.banThreshold) {
promises.push(this.setBanStatus(node, true));
this.emit('node.ban', nodePubKey);
} else if (node.banned) {
// If the reputationScore is not below the banThreshold but node.banned
// is true that means that the node was unbanned
promises.push(this.setBanStatus(node, false));
}

await Promise.all(promises);

return true;
}

return false;
}

public removeAddress = async (nodePubKey: string, address: Address) => {
const node = this.nodes.get(nodePubKey);
if (node) {
Expand All @@ -109,6 +161,22 @@ class NodeList {

return false;
}

private updateReputationScore = (node: NodeInstance, event: ReputationEvent) => {
switch (event) {
case (ReputationEvent.ManualBan):
case (ReputationEvent.ManualUnban): {
node.reputationScore = reputationEventWeight[event];
break;
}
default: node.reputationScore += reputationEventWeight[event];
}
}

private setBanStatus = (node: NodeInstance, status: boolean) => {
node.banned = status;
return node.save();
}
}

export default NodeList;
16 changes: 14 additions & 2 deletions lib/p2p/P2PRepository.ts
@@ -1,19 +1,31 @@
import Logger from '../Logger';
import { Models } from '../db/DB';
import { db } from '../types';
import { NodeInstance } from '../types/db';

class P2PRepository {

constructor(private logger: Logger, private models: Models) {}
constructor(private models: Models) {}

public getNodes = async (): Promise<db.NodeInstance[]> => {
return this.models.Node.findAll();
}

public getReputationEvents = async (node: NodeInstance): Promise<db.ReputationEventInstance[]> => {
return this.models.ReputationEvent.findAll({
where: {
nodeId: node.id,
},
});
}

public addNode = (node: db.NodeFactory) => {
return this.models.Node.create(<db.NodeAttributes>node);
}

public addReputationEvent = async (event: db.ReputationEventFactory) => {
return this.models.ReputationEvent.create(<db.ReputationEventAttributes>event);
}

public addNodes = async (nodes: db.NodeFactory[]) => {
return this.models.Node.bulkCreate(<db.NodeAttributes[]>nodes);
}
Expand Down
20 changes: 17 additions & 3 deletions lib/p2p/Pool.ts
Expand Up @@ -79,7 +79,7 @@ class Pool extends EventEmitter {
this.addresses.push(address);
});
}
this.nodes = new NodeList(new P2PRepository(logger, models));
this.nodes = new NodeList(new P2PRepository(models));
}

public get peerCount(): number {
Expand Down Expand Up @@ -122,8 +122,11 @@ class Pool extends EventEmitter {
this.handshakeData.addresses = this.addresses;

this.logger.info('Connecting to known / previously connected peers');
await this.nodes.load();
this.connectNodes(this.nodes, false, true).then(() => {
this.bindNodeList();

this.nodes.load().then(() => {
return this.connectNodes(this.nodes, false, true);
}).then(() => {
this.logger.info('Completed start-up connections to known peers.');
}).catch((reason) => {
this.logger.error('Unexpected error connecting to known peers on startup', reason);
Expand All @@ -148,6 +151,17 @@ class Pool extends EventEmitter {
this.connected = false;
}

private bindNodeList = () => {
this.nodes.on('node.ban', (nodePubKey) => {
this.logger.warn(`node ${nodePubKey} was banned`);

const peer = this.peers.get(nodePubKey);
if (peer) {
peer.close();
}
});
}

private verifyReachability = () => {
this.handshakeData.addresses!.forEach(async (address) => {
const externalAddress = addressUtils.toString(address);
Expand Down
18 changes: 17 additions & 1 deletion lib/types/db.ts
@@ -1,6 +1,7 @@
import Sequelize, { DataTypeAbstract, DefineAttributeColumnOptions } from 'sequelize';
import { Address, NodeConnectionInfo } from './p2p';
import { Currency, Pair } from './orders';
import { ReputationEvent } from './enums';

export type SequelizeAttributes<T extends { [key: string]: any }> = {
[P in keyof T]: string | DataTypeAbstract | DefineAttributeColumnOptions
Expand Down Expand Up @@ -34,7 +35,9 @@ export type NodeAttributes = NodeFactory & {
lastAddress: Address;
};

export type NodeInstance = NodeAttributes & Sequelize.Instance<NodeAttributes>;
export type NodeInstance = NodeAttributes & Sequelize.Instance<NodeAttributes> & {
reputationScore: number;
};

export type PairFactory = Pair;

Expand All @@ -43,3 +46,16 @@ export type PairAttributes = PairFactory & {
};

export type PairInstance = PairAttributes & Sequelize.Instance<PairAttributes>;

export type ReputationEventFactory = {
event: ReputationEvent;
nodeId: number;
};

export type ReputationEventAttributes = ReputationEventFactory & {
id: number;
};

export type ReputationEventInstance = ReputationEventAttributes & Sequelize.Instance<ReputationEventAttributes> & {
createdAt: number;
};
8 changes: 8 additions & 0 deletions lib/types/enums.ts
Expand Up @@ -40,3 +40,11 @@ export enum SwapDealState {
Error = 1,
Completed = 2,
}

export enum ReputationEvent {
ManualBan = 0,
ManualUnban = 1,
PacketTimeout = 2,
SwapFailure = 3,
SwapSuccess = 4,
}
2 changes: 1 addition & 1 deletion test/integration/OrderBook.spec.ts
Expand Up @@ -24,7 +24,7 @@ describe('OrderBook', () => {
await db.init();

orderBookRepository = new OrderBookRepository(loggers.orderbook, db.models);
const p2pRepository = new P2PRepository(loggers.p2p, db.models);
const p2pRepository = new P2PRepository(db.models);

await p2pRepository.addNode(
{ nodePubKey: nodeKey.nodePubKey, addresses: [] },
Expand Down
15 changes: 15 additions & 0 deletions test/p2p/sanity.spec.ts
Expand Up @@ -3,6 +3,7 @@ import Xud from '../../lib/Xud';
import chaiAsPromised from 'chai-as-promised';
import { getUri } from '../../lib/utils/utils';
import { getUnusedPort } from '../utils';
import { ReputationEvent } from '../../lib/types/enums';

chai.use(chaiAsPromised);

Expand Down Expand Up @@ -99,6 +100,20 @@ describe('P2P Sanity Tests', () => {
await expect(connectPromise).to.be.rejectedWith(`could not connect to peer at localhost:${port}`);
});

it('should disconnect from a node after banning it', async () => {
await nodeOne.service.connect({ nodeUri: nodeTwoUri });

const nodeList = nodeOne['pool']['nodes'];

const banned = await nodeList.addReputationEvent(nodeTwo.nodePubKey, ReputationEvent.ManualBan);
expect(banned).to.be.true;

expect(nodeList.isBanned(nodeTwo.nodePubKey)).to.be.true;

const listPeersResult = await nodeOne.service.listPeers();
expect(listPeersResult.length).to.be.equal(0);
});

after(async () => {
await Promise.all([nodeOne['shutdown'](), nodeTwo['shutdown']()]);
});
Expand Down

0 comments on commit 9f8ab32

Please sign in to comment.