Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const setup = async () => {
const network = await Network.create(
logger,
networkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
171 changes: 142 additions & 29 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,20 @@ export class Agent {
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// There should be a DipsManager in the operator
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
logger.debug('Ensuring indexing rules for DIPs', {
protocolNetwork: network.specification.networkIdentifier,
})
await operator.dipsManager.ensureAgreementRules()
} else {
logger.debug(
'DIPs is disabled, skipping indexing rule enforcement',
)
}
logger.trace('Fetching indexing rules', {
protocolNetwork: network.specification.networkIdentifier,
})
Expand Down Expand Up @@ -324,12 +338,21 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
// Skip fetching active deployments if the deployment management mode is manual, DIPs is disabled, and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
if (this.deploymentManagement === DeploymentManagementMode.AUTO) {
let dipsEnabled = false
await this.multiNetworks.map(async ({ network }) => {
if (network.specification.indexerOptions.enableDips) {
dipsEnabled = true
}
})
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
dipsEnabled
) {
logger.debug('Fetching active deployments')
const assignments =
await this.graphNode.subgraphDeploymentsAssignments(
Expand All @@ -338,7 +361,7 @@ export class Agent {
return assignments.map(assignment => assignment.id)
} else {
logger.info(
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled",
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and DIPs is disabled",
)
return []
}
Expand All @@ -351,37 +374,63 @@ export class Agent {
},
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.subgraphDeployments()
}),
{
onError: error =>
logger.warn(
`Failed to obtain network deployments, trying again later`,
{ error },
),
},
)
const networkAndDipsDeployments: Eventual<
NetworkMapped<SubgraphDeployment[]>
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching network deployments', {
protocolNetwork: network.specification.networkIdentifier,
})
const deployments = network.networkMonitor.subgraphDeployments()
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const resolvedDeployments = await deployments
const dipsDeployments = await Promise.all(
(await operator.dipsManager.getActiveDipsDeployments()).map(
deployment =>
network.networkMonitor.subgraphDeployment(
deployment.ipfsHash,
),
),
)
for (const deployment of dipsDeployments) {
if (
resolvedDeployments.find(
d => d.id.bytes32 === deployment.id.bytes32,
) == null
) {
resolvedDeployments.push(deployment)
}
}
return resolvedDeployments
}
return deployments
}),
{
onError: error =>
logger.warn(
`Failed to obtain network deployments, trying again later`,
{ error },
),
},
)

const networkDeploymentAllocationDecisions: Eventual<
NetworkMapped<AllocationDecision[]>
> = join({
networkDeployments,
networkAndDipsDeployments,
indexingRules,
}).tryMap(
async ({ indexingRules, networkDeployments }) => {
async ({ indexingRules, networkAndDipsDeployments }) => {
return this.multiNetworks.mapNetworkMapped(
this.multiNetworks.zip(indexingRules, networkDeployments),
this.multiNetworks.zip(indexingRules, networkAndDipsDeployments),
async (
{ network }: NetworkAndOperator,
[indexingRules, networkDeployments]: [
[indexingRules, networkAndDipsDeployments]: [
IndexingRuleAttributes[],
SubgraphDeployment[],
],
Expand All @@ -405,7 +454,11 @@ export class Agent {
logger.trace('Evaluating which deployments are worth allocating to')
return indexingRules.length === 0
? []
: evaluateDeployments(logger, networkDeployments, indexingRules)
: evaluateDeployments(
logger,
networkAndDipsDeployments,
indexingRules,
)
},
)
},
Expand Down Expand Up @@ -599,9 +652,42 @@ export class Agent {
}
break
case DeploymentManagementMode.MANUAL:
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
await this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// Reconcile DIPs deployments anyways
this.logger.warn(
`Deployment management is manual, but DIPs is enabled. Reconciling DIPs deployments anyways.`,
)
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const dipsDeployments =
await operator.dipsManager.getActiveDipsDeployments()
const newTargetDeployments = new Set([
...activeDeployments,
...dipsDeployments,
])
try {
await this.reconcileDeployments(
activeDeployments,
Array.from(newTargetDeployments),
eligibleAllocations,
)
} catch (err) {
logger.warn(
`Exited early while reconciling deployments. Skipped reconciling actions.`,
{
err: indexerError(IndexerErrorCode.IE005, err),
},
)
return
}
} else {
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
}
})
break
default:
throw new Error(
Expand All @@ -622,6 +708,28 @@ export class Agent {
})
return
}

await this.multiNetworks.mapNetworkMapped(
activeAllocations,
async ({ network, operator }, activeAllocations: Allocation[]) => {
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}

await operator.dipsManager.acceptPendingProposals(
activeAllocations,
)

this.logger.debug(
`Matching agreement allocations for network ${network.specification.networkIdentifier}`,
)
await operator.dipsManager.matchAgreementAllocations(
activeAllocations,
)
}
},
)
},
)
}
Expand Down Expand Up @@ -948,6 +1056,7 @@ export class Agent {
maxAllocationDuration: HorizonTransitionValue,
network: Network,
operator: Operator,
forceAction: boolean = false,
): Promise<void> {
const logger = this.logger.child({
deployment: deploymentAllocationDecision.deployment.ipfsHash,
Expand All @@ -971,6 +1080,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
case true: {
// If no active allocations and subgraph health passes safety check, create one
Expand Down Expand Up @@ -1008,6 +1118,7 @@ export class Agent {
deploymentAllocationDecision,
mostRecentlyClosedAllocation,
isHorizon,
forceAction,
)
}
} else if (activeDeploymentAllocations.length > 0) {
Expand All @@ -1016,6 +1127,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
} else {
// Refresh any expiring allocations
Expand All @@ -1032,6 +1144,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
expiringAllocations,
forceAction,
)
}
}
Expand Down
41 changes: 40 additions & 1 deletion packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
createIndexerManagementClient,
createIndexerManagementServer,
defineIndexerManagementModels,
definePendingRcaProposalModel,
defineQueryFeeModels,
GraphNode,
indexerError,
Expand Down Expand Up @@ -361,6 +362,26 @@ export const start = {
required: false,
group: 'Indexer Infrastructure',
})
.option('enable-dips', {
description: 'Whether to enable Indexing Fees (DIPs)',
type: 'boolean',
default: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dipper-endpoint', {
description: 'Gateway endpoint for DIPs receipts',
type: 'string',
array: false,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-allocation-amount', {
description: 'Amount of GRT to allocate for DIPs',
type: 'number',
default: 1,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -388,6 +409,9 @@ export const start = {
) {
return 'Invalid --rebate-claim-max-batch-size provided. Must be > 0 and an integer.'
}
if (argv['enable-dips'] && !argv['dipper-endpoint']) {
return 'Invalid --dipper-endpoint provided. Must be provided when --enable-dips is true.'
}
return true
})
},
Expand Down Expand Up @@ -428,6 +452,10 @@ export async function createNetworkSpecification(
maxProvisionInitialSize: argv.maxProvisionInitialSize,
finalityTime: argv.chainFinalizeTime,
legacyMnemonics: argv.legacyMnemonics,
enableDips: argv.enableDips,
dipperEndpoint: argv.dipperEndpoint,
dipsAllocationAmount: argv.dipsAllocationAmount,
dipsEpochsMargin: argv.dipsEpochsMargin,
}

const transactionMonitoring = {
Expand Down Expand Up @@ -643,6 +671,9 @@ export async function run(
await sequelize.sync()
logger.info(`Successfully synced database models`)

// Define after sync so Sequelize won't try to create/alter this indexer-rs-owned table
const pendingRcaModel = definePendingRcaProposalModel(sequelize)

// --------------------------------------------------------------------------------
// * Networks
// --------------------------------------------------------------------------------
Expand All @@ -653,7 +684,14 @@ export async function run(
const networks: Network[] = await pMap(
networkSpecifications,
async (spec: NetworkSpecification) =>
Network.create(logger, spec, queryFeeModels, graphNode, metrics),
Network.create(
logger,
spec,
managementModels,
queryFeeModels,
graphNode,
metrics,
),
)

// --------------------------------------------------------------------------------
Expand All @@ -676,6 +714,7 @@ export async function run(
},
},
multiNetworks,
pendingRcaModel,
})

// --------------------------------------------------------------------------------
Expand Down
Loading
Loading