Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suite of fixes & improvements in stats calculation and Anchor support #11

Merged
merged 3 commits into from Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
273 changes: 211 additions & 62 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -15,7 +15,8 @@
"start-alone": "./run.sh",
"test": "node --experimental-vm-modules node_modules/jest/bin/jest.js",
"test:cov": "node --experimental-vm-modules node_modules/jest/bin/jest.js --coverage",
"test:rebuild": "npm run clean:all && npm i && npm run test",
"test:rebuild": "npm run build && npm run test",
"test:reinstall": "npm run clean:all && npm i && npm run test",
"lint": "eslint --cache --ignore-path .gitignore './packages/**/src/**/*.{js,ts,tsx}'",
"lint:fix": "npm run lint -- --fix --quiet",
"build": "npm run clean:dist && npm run build:ts",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Expand Up @@ -39,7 +39,7 @@
"dependencies": {
"@graphql-tools/schema": "^9.0.4",
"@solana/spl-token-registry": "^0.2.7",
"@solana/web3.js": "^1.65.0",
"@solana/web3.js": "^1.66.2",
"bn.js": "^5.2.0",
"cacheable-lookup": "^6.0.4",
"compression": "^1.7.4",
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/storage/__tests__/storage.spec.ts
Expand Up @@ -35,7 +35,7 @@ function getMockedEntity(i: number): MyEntity {
}
}

xdescribe('storage v2 integration tests', () => {
describe('storage v2 integration tests', () => {
const entities = getMockedEntities()

it('should work as expected', async () => {
Expand Down Expand Up @@ -66,7 +66,7 @@ xdescribe('storage v2 integration tests', () => {
],
})

// await storage.save(entities)
await storage.save(entities)

for await (const entry of await storage.getAll()) {
console.log('by id', entry)
Expand Down
16 changes: 10 additions & 6 deletions packages/core/src/utils/time.ts
Expand Up @@ -37,11 +37,11 @@ export function splitDurationIntoIntervals(
? DateTime.fromISO(end, zone)
: DateTime.fromMillis(end, zone)

if (startDate >= endDate) {
if (startDate > endDate) {
return []
}

const length = Math.ceil(
const length = startDate.equals(endDate) ? 1 : Math.ceil(
endDate
.diff(startDate, intervalUnit as DurationUnit)
.get(intervalUnit as DurationUnit) / intervalSize,
Expand All @@ -66,10 +66,14 @@ export function splitDurationIntoIntervals(
// Override leftmost bound with "startDate" and rightmost bound with "endDate"
// [(startDate, A), (A, B), (B, C), (C, endDate)]
if (preserveExactBounds) {
intervals[0] = intervals[0].set({ start: startDate })
intervals[intervals.length - 1] = intervals[intervals.length - 1].set({
end: endDate,
})
if (startDate) {
intervals[0] = intervals[0].set({ start: startDate })
}
if (endDate) {
intervals[intervals.length - 1] = intervals[intervals.length - 1].set({
end: endDate,
})
}
}

// console.log(
Expand Down
2 changes: 1 addition & 1 deletion packages/framework/package.json
Expand Up @@ -35,7 +35,7 @@
"dependencies": {
"@aleph-indexer/core": "^1.0.21",
"@aleph-indexer/layout": "^1.0.14",
"@solana/web3.js": "^1.65.0",
"@solana/web3.js": "^1.66.2",
"bn.js": "^5.2.0",
"bs58": "^5.0.0",
"buffer-layout": "^1.2.2",
Expand Down
4 changes: 2 additions & 2 deletions packages/framework/src/services/fetcher/main.ts
Expand Up @@ -87,7 +87,7 @@ export class FetcherMsMain implements FetcherMsI, PrivateFetcherMsI {
this.pendingTransactions = new PendingWorkPool({
id: 'pending-transactions',
interval: 0,
chunkSize: 1000,
chunkSize: 500,
concurrency: 1,
dal: this.pendingTransactionDAL,
handleWork: this._handlePendingTransactions.bind(this),
Expand All @@ -107,7 +107,7 @@ export class FetcherMsMain implements FetcherMsI, PrivateFetcherMsI {
this.pendingTransactionsFetch = new PendingWorkPool({
id: 'pending-transactions-fetch',
interval: 0,
chunkSize: 200,
chunkSize: 100,
concurrency: 5,
dal: this.pendingTransactionFetchDAL,
handleWork: this._handlePendingTransactionsFetch.bind(this),
Expand Down
100 changes: 100 additions & 0 deletions packages/framework/src/services/parser/src/anchorInstructionParser.ts
@@ -0,0 +1,100 @@
import bs58 from 'bs58'
import {
AlephParsedInnerInstruction,
AlephParsedInstruction,
AlephParsedParsedInstruction,
RawInstruction,
} from '@aleph-indexer/core'
import { DefinedParser } from './parser.js'

/**
* Parses a raw instruction, if a parser for given solana program is available.
* This parser is automatically used for indexers generated from IDL with anchor-ts-generator.
*/
export class AnchorInstructionParser<
EventTypeEnum extends string,
> extends DefinedParser<
RawInstruction,
RawInstruction | AlephParsedInstruction
> {
constructor(
public programId: string,
public programName: string,
protected getInstructionType: (data: Buffer) => EventTypeEnum | undefined,
protected accountLayouts: Partial<Record<EventTypeEnum, any>>,
protected dataLayouts: Partial<Record<EventTypeEnum, any>>,
) {
super()
}

parse(
rawIx: RawInstruction | AlephParsedInstruction,
): RawInstruction | AlephParsedInstruction {
if (!this.isCompatibleInstruction(rawIx)) return rawIx

const decoded = this.getInstructionData(rawIx)
if (!decoded) return rawIx

const type = this.getInstructionType(decoded)
if (!type) return rawIx

const parsedIx: AlephParsedParsedInstruction = rawIx as any
parsedIx.program = this.programName

const { instructionDiscriminator, ...data } = this.parseInstructionData(type, decoded)[0]
const accounts = this.parseInstructionAccounts(type, parsedIx)

parsedIx.parsed = {
type,
info: {
...(rawIx as any).parsed?.info,
data,
accounts,
},
}

return parsedIx
}

protected isCompatibleInstruction(
ix: RawInstruction | AlephParsedInstruction | AlephParsedInnerInstruction,
): boolean {
return ix.programId === this.programId
}

protected getInstructionData(
rawIx: RawInstruction | AlephParsedInstruction,
): Buffer | undefined {
if (!('data' in rawIx)) return
return Buffer.from(bs58.decode(rawIx.data))
}

protected parseInstructionData(type: EventTypeEnum, data: Buffer): any {
try {
const template = this.dataLayouts[type]
if (!template) return {}

return this.dataLayouts[type].deserialize(data)
} catch (e) {
console.error(e)
}
}

protected parseInstructionAccounts(
type: EventTypeEnum,
rawIx: RawInstruction | AlephParsedInstruction,
): any {
const info: any = {}

const template = this.dataLayouts[type]
if (!template) return {}

if ('accounts' in rawIx) {
for (const [index, name] of this.accountLayouts[type].entries()) {
info[name] = rawIx.accounts[index]
}
}

return info
}
}
@@ -1,8 +1,9 @@
import { ParsedInstructionV1, RawInstruction } from '@aleph-indexer/core'
import { LayoutFactory } from './layout/layoutFactory.js'
import { DefinedParser } from './parser.js'
import { InstructionParser } from './instructionParser.js'
import { SplInstructionParser } from './splInstructionParser.js'
import { LayoutImplementation } from './layout/types.js'
import { AnchorInstructionParser } from "./anchorInstructionParser.js";

/**
* Finds all available instruction parsers and aggregates them for use.
Expand Down Expand Up @@ -62,13 +63,24 @@ export class InstructionParserLibrary extends DefinedParser<

if (!implementation) return

parser = new InstructionParser(
implementation.programID,
implementation.name,
implementation.getInstructionType,
implementation.accountLayoutMap,
implementation.dataLayoutMap,
)
// @note: deserialize() is used in Beet, so we will use AnchorInstructionParser here
if (Object.values(implementation.dataLayoutMap)[0].deserialize) {
parser = new AnchorInstructionParser(
implementation.programID,
implementation.name,
implementation.getInstructionType,
implementation.accountLayoutMap,
implementation.dataLayoutMap
)
} else {
parser = new SplInstructionParser(
implementation.programID,
implementation.name,
implementation.getInstructionType,
implementation.accountLayoutMap,
implementation.dataLayoutMap,
)
}

this.instructionParsers[programId] = parser

Expand Down
Expand Up @@ -7,8 +7,12 @@ import {
} from '@aleph-indexer/core'
import { DefinedParser } from './parser.js'

// @note: Based on solana-program-library (would need different implementation for Anchor-based programs).
export class InstructionParser<
/**
* Parses a raw instruction, if a parser for given solana program is available.
* Based on solana-program-library, use {@link AnchorInstructionParser} for Anchor instructions, when using Beet as a
* layout descriptor.
*/
export class SplInstructionParser<
EventTypeEnum extends string,
> extends DefinedParser<
RawInstruction,
Expand Down
11 changes: 5 additions & 6 deletions packages/framework/src/utils/domain/indexer/main.ts
Expand Up @@ -161,11 +161,11 @@ export class IndexerMainDomain {
* @param type The type of time-series to get.
* @param filters The transformations and clipping to apply to the time-series.
*/
async getAccountTimeSeriesStats(
async getAccountTimeSeriesStats<V>(
accounts: string[] = [],
type: string,
filters: AccountStatsFilters,
): Promise<AccountTimeSeriesStats[]> {
): Promise<AccountTimeSeriesStats<V>[]> {
this.checkStats()

accounts =
Expand All @@ -177,7 +177,7 @@ export class IndexerMainDomain {
account,
method: 'getTimeSeriesStats',
args: [type, filters],
})) as AccountTimeSeriesStats
})) as AccountTimeSeriesStats<V>

return stats
}),
Expand All @@ -188,19 +188,18 @@ export class IndexerMainDomain {
* Returns the global stats for the given accounts.
* @param accounts The accounts to get the summary from.
*/
async getAccountStats(accounts: string[] = []): Promise<AccountStats[]> {
async getAccountStats<V>(accounts: string[] = []): Promise<AccountStats<V>[]> {
this.checkStats()

accounts =
accounts.length !== 0 ? accounts : Array.from(this.accounts.values())

return Promise.all(
accounts.map(async (account) => {
const stats = (await this.context.apiClient.invokeDomainMethod({
account,
method: 'getStats',
args: [],
})) as AccountStats
})) as AccountStats<V>

return stats
}),
Expand Down
2 changes: 1 addition & 1 deletion packages/framework/src/utils/moleculer/config.ts
Expand Up @@ -31,7 +31,7 @@ export const defaultBrokerConfig: BrokerOptions = {
requestTimeout: 0,
serializer: 'JSON',
internalServices: false,
logLevel: 'debug',
logLevel: 'warn',
cacher: 'Memory',
validator: false,
heartbeatInterval: HEART_BEATS,
Expand Down
13 changes: 7 additions & 6 deletions packages/framework/src/utils/stats/accountTimeSeries.ts
Expand Up @@ -24,12 +24,12 @@ const { JobRunner } = Utils
/**
* Defines the account stats handler class.
*/
export class AccountTimeSeriesStatsManager {
export class AccountTimeSeriesStatsManager<V> {
protected compactionJob!: Utils.JobRunner
protected stats!: AccountStats
protected stats!: AccountStats<V>

constructor(
public config: AccountTimeSeriesStatsConfig,
public config: AccountTimeSeriesStatsConfig<V>,
protected indexerApi: IndexerMsI,
protected stateDAL: StatsStateStorage,
protected timeSeriesDAL: StatsTimeSeriesStorage,
Expand Down Expand Up @@ -71,15 +71,15 @@ export class AccountTimeSeriesStatsManager {
}
}

async getStats(): Promise<AccountStats> {
async getStats(): Promise<AccountStats<V>> {
if (!this.stats) {
await this.aggregateAccountStats(Date.now())
}

return this.stats
}

async process(now: number): Promise<void> {
console.log(`📊 processing time series stats for ${this.config.account}`)
await this.aggregateTimeSeries(now)
await this.aggregateAccountStats(now)
}
Expand Down Expand Up @@ -126,7 +126,8 @@ export class AccountTimeSeriesStatsManager {
const { timeSeriesDAL } = this

if (aggregate) {
const stats = await aggregate({ now, account, timeSeriesDAL })
console.log(`📊 aggregating account stats for ${account}`)
const stats: V = await aggregate({ now, account, timeSeriesDAL })
this.stats = { account, stats }
return
}
Expand Down