Skip to content

Commit

Permalink
feat: add entry point to request node to get monitoring status (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrolland committed Apr 8, 2020
1 parent 2927625 commit 1d9c239
Show file tree
Hide file tree
Showing 25 changed files with 800 additions and 15 deletions.
44 changes: 44 additions & 0 deletions packages/data-access/src/data-access.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as Bluebird from 'bluebird';
import { EventEmitter } from 'events';

import Block from './block';
import IgnoredLocationIndex from './ignored-location';
import IntervalTimer from './interval-timer';
import TransactionIndex from './transaction-index';

Expand All @@ -31,6 +32,11 @@ export interface IDataAccessOptions {
* Defaults to DEFAULT_INTERVAL_TIME.
*/
synchronizationIntervalTime?: number;

/**
* Index of the ignored location with the reason
*/
ignoredLocationIndex?: IgnoredLocationIndex;
}

/**
Expand All @@ -40,11 +46,14 @@ export default class DataAccess implements DataAccessTypes.IDataAccess {
// Transaction index, that allows storing and retrieving transactions by channel or topic, with time boundaries.
// public for test purpose
public transactionIndex: DataAccessTypes.ITransactionIndex;

// boolean to store the initialization state
protected isInitialized: boolean = false;
// Storage layer
private storage: StorageTypes.IStorage;

private ignoredLocationIndex: IgnoredLocationIndex;

// The function used to synchronize with the storage should be called periodically
// This object allows to handle the periodical call of the function
private synchronizationTimer: IntervalTimer;
Expand All @@ -70,6 +79,7 @@ export default class DataAccess implements DataAccessTypes.IDataAccess {
*/
public constructor(storage: StorageTypes.IStorage, options?: IDataAccessOptions) {
const defaultOptions: IDataAccessOptions = {
ignoredLocationIndex: new IgnoredLocationIndex(),
logger: new Utils.SimpleLogger(),
synchronizationIntervalTime: DEFAULT_INTERVAL_TIME,
transactionIndex: new TransactionIndex(),
Expand All @@ -87,6 +97,7 @@ export default class DataAccess implements DataAccessTypes.IDataAccess {
5,
);
this.transactionIndex = options.transactionIndex!;
this.ignoredLocationIndex = options.ignoredLocationIndex!;

this.logger = options.logger!;
}
Expand Down Expand Up @@ -440,6 +451,37 @@ export default class DataAccess implements DataAccessTypes.IDataAccess {
this.synchronizationTimer.stop();
}

/**
* Gets information of the data indexed
*
* @param detailed if true get the list of the files hashes
*/
public async _getStatus(detailed: boolean = false): Promise<any> {
this.checkInitialized();

// last transaction timestamp retrieved
const lastLocationTimestamp = await this.transactionIndex.getLastTransactionTimestamp();
const listIndexedLocation = await this.transactionIndex.getIndexedLocations();
const listIgnoredLocationIndex = await this.ignoredLocationIndex.getIgnoredLocations();

const synchronizationConfig = this.synchronizationTimer.getConfig();

return {
filesIgnored: {
count: Object.keys(listIgnoredLocationIndex).length,
list: detailed ? listIgnoredLocationIndex : undefined,
},
filesRetrieved: {
count: listIndexedLocation.length,
lastTimestamp: lastLocationTimestamp,
list: detailed ? listIndexedLocation : undefined,
},
lastSynchronizationTimestamp: this.lastSyncStorageTimestamp,
storage: await this.storage._getStatus(detailed),
synchronizationConfig,
};
}

/**
* Check the format of the data, extract the topics from it and push location indexed with the topics
*
Expand Down Expand Up @@ -468,6 +510,8 @@ export default class DataAccess implements DataAccessTypes.IDataAccess {
await this.transactionIndex.addTransaction(entry.id, block.header, entry.meta.timestamp);
} catch (e) {
parsingErrorCount++;
// Index ignored Location
await this.ignoredLocationIndex.pushReasonByLocation(entry.id, e.message);
this.logger.debug(`Error: can't parse content of the dataId (${entry.id}): ${e}`, [
'synchronization',
]);
Expand Down
104 changes: 104 additions & 0 deletions packages/data-access/src/ignored-location.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import * as Keyv from 'keyv';

/**
* Interface for reason from location
*/
export interface IReasonByIgnoredLocation {
[location: string]: string;
}
/**
* Class used to store the block's reason indexed by location of blocks
*/
export default class ReasonsByIgnoredLocationIndex {
/**
* reason by location
* maps dataId => reason
*/
private reasonsByIgnoredLocation: Keyv<string>;

private listIgnoredLocation: Keyv<string[]>;

/**
* reasonByLocationTransactionIndex constructor
* @param store a Keyv store to persist the index to
*/
public constructor(store?: Keyv.Store<any>) {
this.reasonsByIgnoredLocation = new Keyv<string>({
namespace: 'reasonsByIgnoredLocation',
store,
});

this.listIgnoredLocation = new Keyv<string[]>({
namespace: 'listIgnoredLocation',
store,
});
}

/**
* Function to push reason indexed by location
*
* @param dataId dataId of the block
* @param reason reason to be ignored
*/
public async pushReasonByLocation(dataId: string, reason: string): Promise<void> {
if (!(await this.reasonsByIgnoredLocation.get(dataId))) {
await this.reasonsByIgnoredLocation.set(dataId, reason);
await this.updateDataId(dataId);
}
}

/**
* Function to update reason indexed by location
*
* @param dataId dataId of the block
* @param reason reason to be ignored
*/
public async removeReasonByLocation(dataId: string): Promise<void> {
await this.reasonsByIgnoredLocation.delete(dataId);
}

/**
* Function to get reason from location
*
* @param dataId location to get the reason from
* @returns reason of the location, null if not found
*/
public async getReasonFromLocation(dataId: string): Promise<string | null> {
const reason: string | undefined = await this.reasonsByIgnoredLocation.get(dataId);
return reason ? reason : null;
}

/**
* Get the list of data ids stored
*
* @returns the list of data ids stored
*/
public async getIgnoredLocations(): Promise<IReasonByIgnoredLocation> {
const listDataId: string[] | undefined = await this.listIgnoredLocation.get('list');

if (!listDataId) {
return {};
}
const result: any = {};
for (const dataId of Array.from(listDataId)) {
result[dataId] = await this.reasonsByIgnoredLocation.get(dataId);
}

return result;
}

/**
* Update the list of data ids stored
*
* @param dataId data id to add to the list
* @returns
*/
private async updateDataId(dataId: string): Promise<void> {
let listDataIds: string[] | undefined = await this.listIgnoredLocation.get('list');
if (!listDataIds) {
listDataIds = [];
}
listDataIds.push(dataId);
await this.listIgnoredLocation.set('list', listDataIds);
}
}
12 changes: 12 additions & 0 deletions packages/data-access/src/interval-timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,16 @@ export default class IntervalTimer {
clearTimeout(this.timeoutObject);
this.timeoutObject = null;
}

/**
* Gets current configuration
*
* @return the current configuration attributes
*/
public getConfig(): any {
return {
intervalTime: this.intervalTime,
successiveFailureThreshold: this.successiveFailureThreshold,
};
}
}
38 changes: 37 additions & 1 deletion packages/data-access/src/transaction-index/transaction-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ export default class TransactionIndex implements DataAccessTypes.ITransactionInd
// Will be used to get the data from timestamp boundaries
private timestampByLocation: TimestampByLocation;

private indexedLocation: Keyv<string[]>;

/**
* Constructor of TransactionIndex
* @param store a Keyv store to persist the index to
* @param store a Keyv store to persist the index
*/
constructor(store?: Keyv.Store<any>) {
this.timestampByLocation = new TimestampByLocation(store);
this.locationByTopic = new LocationByTopic(store);

this.indexedLocation = new Keyv<string[]>({
namespace: 'indexedLocation',
store,
});
}

// tslint:disable-next-line: no-empty
Expand Down Expand Up @@ -58,6 +65,8 @@ export default class TransactionIndex implements DataAccessTypes.ITransactionInd

// add the timestamp in the index
await this.timestampByLocation.pushTimestampByLocation(dataId, timestamp);

await this.updateIndexedLocation(dataId);
}

/**
Expand Down Expand Up @@ -200,4 +209,31 @@ export default class TransactionIndex implements DataAccessTypes.ITransactionInd
return channelIds;
}
}

/**
* the list of indexed locations
*/
public async getIndexedLocations(): Promise<string[]> {
const listDataIds: string[] | undefined = await this.indexedLocation.get('list');
return listDataIds || [];
}

/**
* Update the list of data ids stored
*
* @param dataId data id to add to the list
* @returns
*/
private async updateIndexedLocation(dataId: string): Promise<void> {
let listDataIds: string[] | undefined = await this.indexedLocation.get('list');
if (!listDataIds) {
listDataIds = [];
}

// push it if not already done
if (!listDataIds.includes(dataId)) {
listDataIds.push(dataId);
await this.indexedLocation.set('list', listDataIds);
}
}
}
59 changes: 58 additions & 1 deletion packages/data-access/test/data-access.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ const defaultTestData: Promise<StorageTypes.IEntriesWithLastTimestamp> = Promise
);

const defaultFakeStorage: StorageTypes.IStorage = {
_getStatus: chai.spy(
(): any => ({
fake: 'status',
}),
),
_ipfsAdd: chai.spy(),
append: chai.spy(
(): any => {
Expand Down Expand Up @@ -138,7 +143,7 @@ let clock: sinon.SinonFakeTimers;

// tslint:disable:no-magic-numbers
/* tslint:disable:no-unused-expression */
describe.only('data-access', () => {
describe('data-access', () => {
beforeEach(async () => {
clock = sinon.useFakeTimers();
});
Expand Down Expand Up @@ -492,6 +497,7 @@ describe.only('data-access', () => {

it('cannot persistTransaction() and emit error if confirmation failed', async () => {
const mockStorageEmittingError: StorageTypes.IStorage = {
_getStatus: chai.spy(),
_ipfsAdd: chai.spy(),
append: chai.spy(
(): any => {
Expand Down Expand Up @@ -544,6 +550,55 @@ describe.only('data-access', () => {
});
});

describe('_getStatus', () => {
let dataAccess: any;

beforeEach(async () => {
const fakeStorage = {
...defaultFakeStorage,
read: (param: string): any => {
const dataIdBlock2txFake: StorageTypes.IEntry = {
content: JSON.stringify(blockWith2tx),
id: '1',
meta: { state: StorageTypes.ContentState.CONFIRMED, timestamp: 10 },
};
const result: any = {
dataIdBlock2tx: dataIdBlock2txFake,
};
return result[param];
},
};

dataAccess = new DataAccess(fakeStorage);
await dataAccess.initialize();
});

it('can _getStatus()', async () => {
expect(await dataAccess._getStatus(), 'result with arbitraryTopic1 wrong').to.deep.equal({
filesIgnored: { count: 0, list: undefined },
filesRetrieved: { count: 1, lastTimestamp: 10, list: undefined },
lastSynchronizationTimestamp: 0,
storage: { fake: 'status' },
synchronizationConfig: {
intervalTime: 10000,
successiveFailureThreshold: 5,
},
});
});
it('can _getStatus() with details', async () => {
expect(await dataAccess._getStatus(true), 'result with arbitraryTopic1 wrong').to.deep.equal({
filesIgnored: { count: 0, list: {} },
filesRetrieved: { count: 1, lastTimestamp: 10, list: ['dataIdBlock2tx'] },
lastSynchronizationTimestamp: 0,
storage: { fake: 'status' },
synchronizationConfig: {
intervalTime: 10000,
successiveFailureThreshold: 5,
},
});
});
});

it('synchronizeNewDataId() should throw an error if not initialized', async () => {
const dataAccess = new DataAccess(defaultFakeStorage);

Expand Down Expand Up @@ -572,6 +627,7 @@ describe.only('data-access', () => {
_ipfsAdd: chai.spy(),
append: chai.spy(),
getData: (): Promise<StorageTypes.IEntriesWithLastTimestamp> => testDataNotJsonData,
_getStatus: chai.spy(),
initialize: chai.spy(),
read: chai.spy(),
readMany: chai.spy(),
Expand Down Expand Up @@ -693,6 +749,7 @@ describe.only('data-access', () => {
_ipfsAdd: chai.spy(),
append: chai.spy.returns(appendResult),
getData: (): Promise<StorageTypes.IEntriesWithLastTimestamp> => chai.spy(),
_getStatus: chai.spy(),
initialize: chai.spy(),
read: chai.spy(),
readMany: chai.spy(),
Expand Down
Loading

0 comments on commit 1d9c239

Please sign in to comment.