Skip to content
Permalink
Browse files

Merge pull request #76 from Emurgo/feature/elastic-chunks-support

Feature/elastic chunks support
  • Loading branch information...
vantuz-subhuman committed Oct 9, 2019
2 parents 28d4288 + 6e15f9e commit 8bd6dec281dfaec90f7c491d80eabca6da5b2577
@@ -4,7 +4,10 @@
"baseUrl": "http://localhost:8082",
"template": "testnet2"
},
"elasticNode": "http://localhost:9200",
"elastic": {
"node": "http://localhost:9200",
"indexPrefix": "seiza"
},
"storageProcessor": "seiza-elastic",
"server": {
"port": 8080,
@@ -20,7 +23,7 @@
},
"checkTipSeconds": 15,
"rollbackBlocksCount": 25,
"maxBlockBatchSize": 800,
"maxBlockBatchSize": 900,
"defaultNetwork": "testnet2",
"defaultBridgeUrl": "http://localhost:8082",
"networks": {
@@ -0,0 +1,161 @@
// flow-typed signature: d313afea12fc960c924825fc728d6a90
// flow-typed version: c6154227d1/bunyan_v1.x.x/flow_>=v0.104.x

declare module 'bunyan' {
declare var TRACE: 10;
declare var DEBUG: 20;
declare var INFO: 30;
declare var WARN: 40;
declare var ERROR: 50;
declare var FATAL: 60;

declare type BunyanLogLevels =
| 60 // fatal
| 50 // error
| 40 // warn
| 30 // info
| 20 // debug
| 10; // info
declare type BunyanRecord = {
[key: string]: any,
v: number,
level: BunyanLogLevels,
name: string,
hostname: string,
pid: string,
time: Date,
msg: string,
src: string,
err?: {
message: string,
name: string,
code: any,
signal: any,
stack: string,
...
},
...
};
declare type Writable = { write(rec: BunyanRecord): void, ... };
declare class Logger extends events$EventEmitter {
constructor(options: LoggerOptions): any;
addStream(stream: Stream): void;
addSerializers(serializers: Serializers): void;
child(opts?: LoggerOptions, simple?: boolean): Logger;
reopenFileStreams(): void;
level(): string | number;
level(value: number | string): void;
levels(name: number | string, value: number | string): void;
trace(...params: Array<void>): boolean;
trace(error: Error, format?: any, ...params: Array<any>): void;
trace(buffer: Buffer, format?: any, ...params: Array<any>): void;
trace(obj: Object, format?: any, ...params: Array<any>): void;
trace(format: string, ...params: Array<any>): void;
debug(...params: Array<void>): boolean;
debug(error: Error, format?: any, ...params: Array<any>): void;
debug(buffer: Buffer, format?: any, ...params: Array<any>): void;
debug(obj: Object, format?: any, ...params: Array<any>): void;
debug(format: string, ...params: Array<any>): void;
info(...params: Array<void>): boolean;
info(error: Error, format?: any, ...params: Array<any>): void;
info(buffer: Buffer, format?: any, ...params: Array<any>): void;
info(obj: Object, format?: any, ...params: Array<any>): void;
info(format: string, ...params: Array<any>): void;
warn(...params: Array<void>): boolean;
warn(error: Error, format?: any, ...params: Array<any>): void;
warn(buffer: Buffer, format?: any, ...params: Array<any>): void;
warn(obj: Object, format?: any, ...params: Array<any>): void;
warn(format: string, ...params: Array<any>): void;
error(...params: Array<void>): boolean;
error(error: Error, format?: any, ...params: Array<any>): void;
error(buffer: Buffer, format?: any, ...params: Array<any>): void;
error(obj: Object, format?: any, ...params: Array<any>): void;
error(format: string, ...params: Array<any>): void;
fatal(...params: Array<void>): boolean;
fatal(error: Error, format?: any, ...params: Array<any>): void;
fatal(buffer: Buffer, format?: any, ...params: Array<any>): void;
fatal(obj: Object, format?: any, ...params: Array<any>): void;
fatal(format: string, ...params: Array<any>): void;
static stdSerializers: {
req: (
req: http$ClientRequest<>
) => {
method: string,
url: string,
headers: mixed,
remoteAddress: string,
remotePort: number,
...
},
res: (
res: http$IncomingMessage<>
) => {
statusCode: number,
header: string,
...
},
err: (
err: Error
) => {
message: string,
name: string,
stack: string,
code: string,
signal: string,
...
},
...
};
}
declare interface LoggerOptions {
streams?: Array<Stream>;
level?: BunyanLogLevels | string;
stream?: stream$Writable;
serializers?: Serializers;
src?: boolean;
}
declare type Serializers = { [key: string]: (input: any) => mixed, ... };
declare type Stream = {
type?: string,
level?: number | string,
path?: string,
stream?: stream$Writable | tty$WriteStream | Stream | Writable,
closeOnExit?: boolean,
period?: string,
count?: number,
...
};
declare var stdSerializers: Serializers;
declare function resolveLevel(value: number | string): number;
declare function createLogger(
options: LoggerOptions & { name: string, ... }
): Logger;
declare class RingBuffer extends events$EventEmitter {
constructor(options: RingBufferOptions): any;
writable: boolean;
records: Array<any>;
write(record: BunyanRecord): void;
end(record?: any): void;
destroy(): void;
destroySoon(): void;
}
declare interface RingBufferOptions {
limit: number;
}
declare function safeCycles(): (key: string, value: any) => any;
declare class ConsoleRawStream {
write(rec: BunyanRecord): void;
}
declare var levelFromName: {
trace: typeof TRACE,
debug: typeof DEBUG,
info: typeof INFO,
warn: typeof WARN,
error: typeof ERROR,
fatal: typeof FATAL,
...
};
declare var nameFromLevel: { [key: BunyanLogLevels]: string, ... };
declare var VERSION: string;
declare var LOG_VERSION: string;
}
@@ -1,5 +1,7 @@
// @flow

import type { Logger } from 'bunyan'

import cbor from 'cbor'
import bs58 from 'bs58'
import blake from 'blakejs'
@@ -13,7 +15,7 @@ import { Controller as IController } from 'inversify-restify-utils/lib/interface
import { injectable, decorate, inject } from 'inversify'

import {
Logger, RawDataProvider, StorageProcessor, NetworkConfig,
RawDataProvider, StorageProcessor, NetworkConfig,
} from '../interfaces'
import SERVICE_IDENTIFIER from '../constants/identifiers'
import utils from '../blockchain/utils'
@@ -36,19 +36,8 @@ const GET_BEST_BLOCK_NUM = sql.select()
const GET_UTXOS_BLOCKS_COUNT = sql.select()
.field('(select count(*) from utxos ) + ( select count(*) from blocks) as cnt')


const utxoOnConflictUpdateBlockNum = (query) => {
// squel don't support 'EXCLUDED'
// workaround taken from https://github.com/hiddentao/squel/issues/342
const onConflictClause = ' ON CONFLICT (utxo_id) DO UPDATE SET block_num = EXCLUDED.block_num'
const queryParam = query.toParam()
queryParam.text += onConflictClause
return queryParam
}

export default {
sql,
utxoOnConflictUpdateBlockNum,
UTXOS_INSERT,
GET_BEST_BLOCK_NUM,
BEST_BLOCK_UPDATE,
@@ -1,12 +1,12 @@
// @flow

import _ from 'lodash'
import type { Logger } from 'bunyan'
import { helpers } from 'inversify-vanillajs-helpers'

import type {
Scheduler,
RawDataProvider,
Logger,
StorageProcessor,
} from '../interfaces'
import SERVICE_IDENTIFIER from '../constants/identifiers'
@@ -34,7 +34,7 @@ class CronScheduler implements Scheduler {

storageProcessor: StorageProcessor

#logger: any
logger: Logger

checkTipMillis: number

@@ -61,29 +61,29 @@ class CronScheduler implements Scheduler {
this.maxBlockBatchSize = maxBlockBatchSize
logger.debug('Checking tip every', checkTipSeconds, 'seconds')
logger.debug('Rollback blocks count', rollbackBlocksCount)
this.#logger = logger
this.logger = logger
this.blocksToStore = []
this.lastBlock = null
}

async rollback(atBlockHeight: number) {
this.#logger.info(`Rollback at height ${atBlockHeight} to ${this.rollbackBlocksCount} blocks back.`)
this.logger.info(`Rollback at height ${atBlockHeight} to ${this.rollbackBlocksCount} blocks back.`)
// reset scheduler state
this.blocksToStore = []
this.lastBlock = null

// Recover database state to newest actual block.
const { height } = await this.storageProcessor.getBestBlockNum()
const rollBackTo = height - this.rollbackBlocksCount
this.#logger.info(`Current DB height at rollback time: ${height}. Rolling back to: ${rollBackTo}`)
this.logger.info(`Current DB height at rollback time: ${height}. Rolling back to: ${rollBackTo}`)
await this.storageProcessor.rollbackTo(rollBackTo)
const { epoch, hash } = await this.storageProcessor.getBestBlockNum()
this.lastBlock = { epoch, hash }
}


async processEpochId(id: number, height: number) {
this.#logger.info(`processEpochId: ${id}, ${height}`)
this.logger.info(`processEpochId: ${id}, ${height}`)
const omitEbb = true
const blocks = await this.#dataProvider.getParsedEpochById(id, omitEbb)
for (const block of blocks) {
@@ -104,7 +104,7 @@ class CronScheduler implements Scheduler {
&& block.epoch === this.lastBlock.epoch
&& block.prevHash !== this.lastBlock.hash) {
const lastBlockHash = this.lastBlock ? this.lastBlock.hash : ''
this.#logger.info(`(${block.epoch}/${String(block.slot)}) block.prevHash (${block.prevHash}) !== lastBlock.hash (${lastBlockHash}). Performing rollback...`)
this.logger.info(`(${block.epoch}/${String(block.slot)}) block.prevHash (${block.prevHash}) !== lastBlock.hash (${lastBlockHash}). Performing rollback...`)
return STATUS_ROLLBACK_REQUIRED
}
this.lastBlock = {
@@ -118,13 +118,13 @@ class CronScheduler implements Scheduler {
}

if (flushCache || block.height % LOG_BLOCK_PARSED_THRESHOLD === 0) {
this.#logger.debug(`Block parsed: ${block.hash} ${block.epoch} ${String(block.slot)} ${block.height}`)
this.logger.debug(`Block parsed: ${block.hash} ${block.epoch} ${String(block.slot)} ${block.height}`)
}
return BLOCK_STATUS_PROCESSED
}

async checkTip() {
this.#logger.info('checkTip: checking for new blocks...')
this.logger.info('checkTip: checking for new blocks...')
// local state
const { height, epoch, slot } = await this.storageProcessor.getBestBlockNum()

@@ -134,10 +134,10 @@ class CronScheduler implements Scheduler {
const tipStatus = nodeTip.local
const remoteStatus = nodeTip.remote
if (!tipStatus) {
this.#logger.info('cardano-http-brdige not yet synced')
this.logger.info('cardano-http-brdige not yet synced')
return
}
this.#logger.debug(`Last imported block ${height}. Node status: local=${tipStatus.slot} remote=${remoteStatus.slot} packedEpochs=${packedEpochs}`)
this.logger.debug(`Last imported block ${height}. Node status: local=${tipStatus.slot} remote=${remoteStatus.slot} packedEpochs=${packedEpochs}`)
const [remEpoch, remSlot] = remoteStatus.slot
if (epoch < remEpoch) {
// If local epoch is lower than the current network tip
@@ -153,11 +153,11 @@ class CronScheduler implements Scheduler {
for (const epochId of _.range(epoch, packedEpochs)) {
// Process epoch
await this.processEpochId(epochId, height)
this.#logger.debug(`Epoch parsed: ${epochId}, ${height}`)
this.logger.debug(`Epoch parsed: ${epochId}, ${height}`)
}
} else {
// Packed epoch is not available yet
this.#logger.info(`cardano-http-brdige has not yet packed stable epoch: ${epoch} (lastRemStableEpoch=${lastRemStableEpoch})`)
this.logger.info(`cardano-http-brdige has not yet packed stable epoch: ${epoch} (lastRemStableEpoch=${lastRemStableEpoch})`)
}
return
}
@@ -167,15 +167,15 @@ class CronScheduler implements Scheduler {
blockHeight++, i++) {
const status = await this.processBlockHeight(blockHeight)
if (status === STATUS_ROLLBACK_REQUIRED) {
this.#logger.info('Rollback required.')
this.logger.info('Rollback required.')
await this.rollback(blockHeight)
return
}
}
}

async startAsync() {
this.#logger.info('Scheduler async: starting chain syncing loop')
this.logger.info('Scheduler async: starting chain syncing loop')
const currentMillis = () => new Date().getTime()
const sleep = millis => new Promise(resolve => setTimeout(resolve, millis))
for (;;) {
@@ -187,17 +187,17 @@ class CronScheduler implements Scheduler {
const meta = ERROR_META[e.name]
if (meta) {
errorSleep = meta.sleep
this.#logger.warn(`Scheduler async: failed to check tip :: ${meta.msg}. Sleeping and retrying (err_sleep=${errorSleep})`)
this.logger.warn(`Scheduler async: failed to check tip :: ${meta.msg}. Sleeping and retrying (err_sleep=${errorSleep})`)
} else {
throw e
}
}
const millisEnd = currentMillis()
const millisPassed = millisEnd - millisStart
this.#logger.debug(`Scheduler async: loop finished (millisPassed=${millisPassed})`)
this.logger.debug(`Scheduler async: loop finished (millisPassed=${millisPassed})`)
const millisSleep = errorSleep || (this.checkTipMillis - millisPassed)
if (millisSleep > 0) {
this.#logger.debug('Scheduler async: sleeping for', millisSleep)
this.logger.debug('Scheduler async: sleeping for', millisSleep)
await sleep(millisSleep)
}
}

0 comments on commit 8bd6dec

Please sign in to comment.
You can’t perform that action at this time.