Skip to content

Commit

Permalink
feat(ref-imp): added a number of events to the core service + fix to …
Browse files Browse the repository at this point in the history
…Observer
  • Loading branch information
thehenrytsai committed Jan 4, 2021
1 parent d8c4fb2 commit 4a3575e
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 36 deletions.
65 changes: 53 additions & 12 deletions docs/core.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Sidetree Core Node.js Implementation Document
# Sidetree Core Node.js Reference Implementation Document

This document focuses on the Node.js implementation of the Sidetree protocol.
This document focuses on the Node.js reference implementation of the Sidetree specification.

## Overview

Expand All @@ -23,12 +23,7 @@ A light node is a node that retains the ability to independently resolve DIDs wi
## Observer

The _Observer_ watches the public blockchain to identify Sidetree operations, then parses the operations into data structures that can be used for efficient DID resolutions.
The primary goals for the _Observer_ are to:
1. Maximize ingestion processing rate.
1. Allow horizontal scaling for high DID resolution throughput.
1. Allow sharing of the processed data structure by multiple Sidetree nodes to minimize redundant computation.

The above goals lead to the design decision of minimal processing of the operations at the time of ingestion, and deferring the heavy processing such as signature validations to the time of DID resolution.
The _Observer_ defers heavy processing such as signature validations to the time of DID resolution.

## Versioning
As the Sidetree protocol evolves, existing nodes executing an earlier version of the protocol need to upgrade to execute the newer version of the protocol while remaining backward compatible to processing of prior transactions and operations.
Expand Down Expand Up @@ -152,7 +147,7 @@ HTTP/1.1 200 OK


## Blockchain REST API
The blockchain REST API interface aims to abstract the underlying blockchain away from the main protocol logic. This allows the underlying blockchain to be replaced without affecting the core protocol logic. The interface also allows the protocol logic to be implemented in an entirely different language while interfacing with the same blockchain.
The blockchain REST API interface is used by the Core service and aims to abstract the underlying blockchain away from the main protocol logic. This allows the underlying blockchain to be replaced without affecting the core protocol logic. The interface also allows the protocol logic to be implemented in an entirely different language while interfacing with the same blockchain.

### Get latest blockchain time
Gets the latest logical blockchain time. This API allows the Observer and Batch Writer to determine protocol version to be used.
Expand Down Expand Up @@ -455,7 +450,7 @@ Returns `HTTP 400 Bad Request` with the following values as the `code` parameter
| Code | Description |
| ------------------------------- | ----------------------------------------------------------------------------------------------------------- |
| spending_cap_per_period_reached | if with the given fee (derived from minimumFee) this node will exceed the spending limit as configured in the parameters. |
| not_enough_balace_for_write | if the wallet configured in the parameters does not have enough balance to complete the write operation. |
| not_enough_balance_for_write | if the wallet configured in the parameters does not have enough balance to complete the write operation. |

#### Request path
```
Expand Down Expand Up @@ -572,7 +567,7 @@ GET /locks/gHasdfasodf23230o0jlk23323
"amountLocked": "A number representing the amount that was locked.",
"identifier": "The string representing the identifier of the lock. This is the same value which is passed in the request path.",
"lockTransactionTime": "A number representing the transaction time at which the lock became active.",
"owner": "A string reprsenting the owner of the lock.",
"owner": "A string representing the owner of the lock.",
"unlockTransactionTime": "A number representing the transaction time at which the lock became inactive."
}
```
Expand Down Expand Up @@ -627,7 +622,7 @@ GET /writerlock
"amountLocked": "A number representing the amount that was locked.",
"identifier": "The string representing the identifier of the lock.",
"lockTransactionTime": "A number representing the transaction time at which the lock became active.",
"owner": "A string reprsenting the owner of the lock.",
"owner": "A string representing the owner of the lock.",
"unlockTransactionTime": "A number representing the transaction time at which the lock became inactive."
}
```
Expand Down Expand Up @@ -697,3 +692,49 @@ HTTP/1.1 200 OK
}
```

## Core Service Events

### `batch_writer_processing_loop_failed`
Occurs every time the batch writer fails a processing loop.

Event data: none

### `batch_writer_processing_loop_success`
Occurs every time the batch writer completes a processing loop.

Event data:
```json
{
"batchSize": "The size of the batch written.",
}
```

### `blockchain_time_changed`
Occurs every time the underlying blockchain time changes.

Event data:
```json
{
"time": "The logical blockchain time as an integer."
}
```

### `download_manager_download`
Occurs every time an asynchronous content download has occurred regardless of success.

Event data:
```json
{
"code": "The download result code."
}
```

### `observer_processing_loop_failed`
Occurs every time the observer fails a processing loop.

Event data: none

### `observer_processing_loop_success`
Occurs every time the observer completes a processing loop.

Event data: none
8 changes: 6 additions & 2 deletions lib/common/EventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import LogColor from './LogColor';
export default class EventEmitter {
// Default to basic console log.
private static singleton: IEventEmitter = {
emit: async (eventCode) => {
console.log(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}`));
emit: async (eventCode, eventData?) => {
if (eventData === undefined) {
console.log(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}`));
} else {
console.log(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}: ${JSON.stringify(eventData)}`));
}
}
};

Expand Down
5 changes: 3 additions & 2 deletions lib/core/BatchScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ export default class BatchScheduler {
const currentTime = this.blockchain.approximateTime.time;
const batchWriter = this.versionManager.getBatchWriter(currentTime);

await batchWriter.write();
const batchSize = await batchWriter.write();

EventEmitter.emit(EventCode.BatchWriterProcessingLoopSuccess);
EventEmitter.emit(EventCode.BatchWriterProcessingLoopSuccess, { batchSize });
} catch (error) {
EventEmitter.emit(EventCode.BatchWriterProcessingLoopFailed);
Logger.error('Unexpected and unhandled error during batch writing, investigate and fix:');
Logger.error(error);
} finally {
Expand Down
15 changes: 11 additions & 4 deletions lib/core/Blockchain.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as HttpStatus from 'http-status';
import BlockchainTimeModel from './models/BlockchainTimeModel';
import CoreErrorCode from './ErrorCode';
import EventCode from './EventCode';
import EventEmitter from '../common/EventEmitter';
import IBlockchain from './interfaces/IBlockchain';
import JsonAsync from './versions/latest/util/JsonAsync';
import Logger from '../common/Logger';
Expand Down Expand Up @@ -164,13 +166,18 @@ export default class Blockchain implements IBlockchain {
throw new SidetreeError(CoreErrorCode.BlockchainGetLatestTimeResponseNotOk, errorMessage);
}

const responseBody = JSON.parse(responseBodyString);
const newBlockchainTimeModel = JSON.parse(responseBodyString) as BlockchainTimeModel;

// Update the cached blockchain time every time blockchain time is fetched over the network,
this.cachedBlockchainTime = responseBody;
// Emit a time change event.
if (newBlockchainTimeModel.time !== this.cachedBlockchainTime.time) {
EventEmitter.emit(EventCode.BlockchainTimeChanged, { time: newBlockchainTimeModel.time });
}

// Update the cached blockchain time every time blockchain time is fetched over the network.
this.cachedBlockchainTime = newBlockchainTimeModel;

Logger.info(`Refreshed blockchain time: ${responseBodyString}`);
return responseBody;
return newBlockchainTimeModel;
}

public async getFee (transactionTime: number): Promise<number> {
Expand Down
3 changes: 3 additions & 0 deletions lib/core/DownloadManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as crypto from 'crypto';
import EventCode from './EventCode';
import EventEmitter from '../common/EventEmitter';
import FetchResult from '../common/models/FetchResult';
import ICas from './interfaces/ICas';
import Logger from '../common/Logger';
Expand Down Expand Up @@ -130,6 +132,7 @@ export default class DownloadManager {
const fetchResult = this.completedDownloads.get(handle);
this.completedDownloads.delete(handle);

EventEmitter.emit(EventCode.DownloadManagerDownload, { code: fetchResult!.code });
return fetchResult!;
}

Expand Down
4 changes: 4 additions & 0 deletions lib/core/EventCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
* Event codes used by Sidetree core service.
*/
export default {
BatchWriterProcessingLoopFailed: 'batch_writer_processing_loop_failed',
BatchWriterProcessingLoopSuccess: 'batch_writer_processing_loop_success',
BlockchainTimeChanged: 'blockchain_time_changed',
DownloadManagerDownload: 'download_manager_download',
ObserverProcessingLoopFailed: 'observer_processing_loop_failed',
ObserverProcessingLoopSuccess: 'observer_processing_loop_success'
};
17 changes: 9 additions & 8 deletions lib/core/Observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export default class Observer {
*/
private async processTransactions () {
try {
await this.storeConsecutiveTransactionsProcessed(); // Do this in multiple places
await this.storeConsecutiveTransactionsProcessed();

// Keep fetching new Sidetree transactions from blockchain and processing them
// until there are no more new transactions or there is a block reorganization.
Expand Down Expand Up @@ -112,13 +112,13 @@ export default class Observer {

// Queue parallel downloading and processing of chunk files.
for (const transaction of qualifiedTransactions) {
const awaitingTransaction = {
const transactionUnderProcessing = {
transaction: transaction,
processingStatus: TransactionProcessingStatus.Pending
};
this.transactionsUnderProcessing.push(awaitingTransaction);
this.transactionsUnderProcessing.push(transactionUnderProcessing);
// Intentionally not awaiting on downloading and processing each operation batch.
this.processTransaction(transaction, awaitingTransaction);
this.processTransaction(transaction, transactionUnderProcessing);
}

// NOTE: Blockchain reorg has happened for sure only if `invalidTransactionNumberOrTimeHash` AND
Expand Down Expand Up @@ -158,14 +158,14 @@ export default class Observer {
}
} while (moreTransactions);

await this.storeConsecutiveTransactionsProcessed();
Logger.info('Successfully kicked off downloading/processing of all new Sidetree transactions.');

// Continue onto processing unresolvable transactions if any.
await this.processUnresolvableTransactions();

EventEmitter.emit(EventCode.ObserverProcessingLoopSuccess);
} catch (error) {
EventEmitter.emit(EventCode.ObserverProcessingLoopFailed);
Logger.error(`Encountered unhandled and possibly fatal Observer error, must investigate and fix:`);
Logger.error(error);
} finally {
Expand Down Expand Up @@ -288,11 +288,12 @@ export default class Observer {
Logger.info('Reverting operations...');
await this.operationStore.delete(bestKnownValidRecentTransactionNumber);

// NOTE: MUST do this step LAST to handle incomplete operation rollback due to unexpected scenarios, such as power outage etc.
await this.transactionStore.removeTransactionsLaterThan(bestKnownValidRecentTransactionNumber);
await this.unresolvableTransactionStore.removeUnresolvableTransactionsLaterThan(bestKnownValidRecentTransactionNumber);

// Reset the in-memory last known good Transaction so we next processing cycle will fetch from the correct timestamp/maker.
// NOTE: MUST do steps below LAST in this particular order to handle incomplete operation rollback due to unexpected scenarios, such as power outage etc.
await this.transactionStore.removeTransactionsLaterThan(bestKnownValidRecentTransactionNumber);

// Reset the in-memory last known good Transaction so next processing cycle will fetch from the correct timestamp/marker.
this.lastKnownTransaction = bestKnownValidRecentTransaction;
}
}
3 changes: 2 additions & 1 deletion lib/core/interfaces/IBatchWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export default interface IBatchWriter {
/**
* Writes one or more batches of batches of operations to content addressable storage and blockchain.
* @returns The size of the batch written, 0 if no batch is written.
*/
write (): Promise<void>;
write (): Promise<number>;
}
6 changes: 4 additions & 2 deletions lib/core/versions/latest/BatchWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export default class BatchWriter implements IBatchWriter {
private cas: ICas,
private versionMetadataFetcher: IVersionMetadataFetcher) { }

public async write () {
public async write (): Promise<number> {
const normalizedFee = await this.blockchain.getFee(this.blockchain.approximateTime.time);
const currentLock = await this.blockchain.getWriterValueTimeLock();
const numberOfOpsAllowed = this.getNumberOfOperationsAllowed(currentLock);
Expand All @@ -45,7 +45,7 @@ export default class BatchWriter implements IBatchWriter {
// Do nothing if there is nothing to batch together.
if (queuedOperations.length === 0) {
Logger.info(`No queued operations to batch.`);
return;
return 0;
}

Logger.info(LogColor.lightBlue(`Batch size = ${LogColor.green(numberOfOperations)}`));
Expand Down Expand Up @@ -101,6 +101,8 @@ export default class BatchWriter implements IBatchWriter {

// Remove written operations from queue after batch writing has completed successfully.
await this.operationQueue.dequeue(queuedOperations.length);

return numberOfOperations;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions lib/core/versions/latest/TransactionProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ export default class TransactionProcessor implements ITransactionProcessor {
} catch (error) {
// If we encounter any error, regardless of whether the transaction should be retried for processing,
// we set all the provisional/chunk files to be `undefined`,
// this is because chunk file would not be available or valid for its deltas to be used during resolutions,
// thus no need to store the operation references in the provisional index file.
// this is because chunk file would not be available/valid for its deltas to be used during resolutions,
// thus no need to store the operation references found in the provisional index file.
provisionalIndexFile = undefined;
provisionalProofFile = undefined;
chunkFileModel = undefined;
Expand Down
25 changes: 24 additions & 1 deletion tests/core/Blockchain.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Blockchain from '../../lib/core/Blockchain';
import CoreErrorCode from '../../lib/core/ErrorCode';
import ErrorCode from '../../lib/bitcoin/ErrorCode';
import EventEmitter from '../../lib/common/EventEmitter';
import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator';
import ReadableStream from '../../lib/common/ReadableStream';
import ServiceVersionModel from '../../lib/common/models/ServiceVersionModel';
Expand Down Expand Up @@ -267,7 +268,7 @@ describe('Blockchain', async () => {
});

describe('getLatestTime()', async () => {
it('should throw if encountered error when fetching time from blockchain service..', async () => {
it('should throw if encountered error when fetching time from blockchain service.', async () => {
const blockchainClient = new Blockchain('Unused URI');
const mockFetchResponse = {
status: 500,
Expand All @@ -291,6 +292,28 @@ describe('Blockchain', async () => {

fail();
});

it('should not emit an event when underlying blockchain time is unchanged. ', async () => {
const blockchainClient = new Blockchain('Unused URI');

// Preset the cached to time to a hardcoded value.
const hardcodedBlockchainTimeModel = { time: 1, hash: 'unused' };
blockchainClient['cachedBlockchainTime'] = hardcodedBlockchainTimeModel;

// Mock the time fetch response to the same hardcoded value to simulate unchanged time.
const mockFetchResponse = {
status: 200,
body: {
read: () => { return Buffer.from(JSON.stringify(hardcodedBlockchainTimeModel)); }
}
};
spyOn(blockchainClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse));
const eventEmitterSpy = spyOn(EventEmitter, 'emit');

await blockchainClient.getLatestTime();

expect(eventEmitterSpy).not.toHaveBeenCalled();
});
});

describe('initialize', async () => {
Expand Down
2 changes: 1 addition & 1 deletion tests/core/versions/test-version-1/BatchWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export default class BatchWriter implements IBatchWriter {
console.info(this.operationQueue, this.blockchain, this.cas, this.versionMetadataFetcher);
}

async write (): Promise<void> {
async write (): Promise<number> {
throw new Error('BatchWriter: Not implemented. Version: TestVersion1');
}
}
3 changes: 2 additions & 1 deletion tests/mocks/MockBatchWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export default class MockBatchWriter implements IBatchWriter {
/** Keeps invocation count for testing purposes. */
public invocationCount = 0;

public async write (): Promise<void> {
public async write (): Promise<number> {
this.invocationCount++;
return 0;
}
}

0 comments on commit 4a3575e

Please sign in to comment.