Skip to content

Commit

Permalink
*: Remove all notion of node ids and subgraph node assignments
Browse files Browse the repository at this point in the history
- Leave subgraph deployment assignment decisions to the graph-node,
configurable in the graph-node.toml config file
  • Loading branch information
fordN committed Mar 8, 2024
1 parent 736ea10 commit 861e840
Show file tree
Hide file tree
Showing 12 changed files with 7 additions and 99 deletions.
4 changes: 0 additions & 4 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,11 @@ const setup = async () => {
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

const indexNodeIDs = ['node_1']

graphNode = new GraphNode(
logger,
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
'https://test-status-endpoint.xyz',
indexNodeIDs,
)

const networkSpecification = specification.NetworkSpecification.parse({
Expand Down Expand Up @@ -199,7 +196,6 @@ const setup = async () => {
indexerManagementClient = await createIndexerManagementClient({
models,
graphNode,
indexNodeIDs,
logger,
defaults: {
globalIndexingRule: {
Expand Down
13 changes: 5 additions & 8 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,17 +939,17 @@ export class Agent {
eligibleAllocations.map(allocation => allocation.subgraphDeployment.id),
)

// Identify which subgraphs to deploy and which to remove
// Identify which subgraphs to deploy and which to pause
const deploy = targetDeployments.filter(
deployment => !deploymentInList(activeDeployments, deployment),
)
const remove = activeDeployments.filter(
const pause = activeDeployments.filter(
deployment =>
!deploymentInList(targetDeployments, deployment) &&
!deploymentInList(eligibleAllocationDeployments, deployment),
)

if (deploy.length + remove.length !== 0) {
if (deploy.length + pause.length !== 0) {
logger.info('Deployment changes', {
indexingNetworkSubgraph,
syncing: activeDeployments.map(id => id.display),
Expand All @@ -958,14 +958,13 @@ export class Agent {
id => id.display,
),
deploy: deploy.map(id => id.display),
remove: remove.map(id => id.display),
pause: pause.map(id => id.display),
})
} else {
logger.debug('No deployment changes are necessary')
}

// ----------------------------------------------------------------------------------------
// Execute Deployments (Add, Remove)
// Execute Deployments (Add, Pause)
// ----------------------------------------------------------------------------------------

// Deploy/remove up to 10 subgraphs in parallel
Expand All @@ -982,8 +981,6 @@ export class Agent {
})

// Ensure the deployment is deployed to the indexer
// Note: we're not waiting here, as sometimes indexing a subgraph
// will block if the IPFS files cannot be retrieved
await this.graphNode.ensure(name, deployment)
}),
)
Expand Down
15 changes: 0 additions & 15 deletions packages/indexer-agent/src/commands/common-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,6 @@ import { parseDeploymentManagementMode } from '@graphprotocol/indexer-common'
// Injects all CLI options shared between this module's commands into a `yargs.Argv` object.
export function injectCommonStartupOptions(argv: Argv): Argv {
argv
.option('index-node-ids', {
description:
'Node IDs of Graph nodes to use for indexing (separated by commas)',
type: 'string',
array: true,
required: true,
coerce: (
arg, // TODO: we shouldn't need to coerce because yargs already separates values by space
) =>
arg.reduce(
(acc: string[], value: string) => [...acc, ...value.split(',')],
[],
),
group: 'Indexer Infrastructure',
})
.option('indexer-management-port', {
description: 'Port to serve the indexer management API at',
type: 'number',
Expand Down
2 changes: 0 additions & 2 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ export async function run(
argv.graphNodeAdminEndpoint,
argv.graphNodeQueryEndpoint,
argv.graphNodeStatusEndpoint,
argv.indexNodeIds,
)

// --------------------------------------------------------------------------------
Expand Down Expand Up @@ -572,7 +571,6 @@ export async function run(
const indexerManagementClient = await createIndexerManagementClient({
models: managementModels,
graphNode,
indexNodeIDs: argv.indexNodeIds,
logger,
defaults: {
globalIndexingRule: {
Expand Down
3 changes: 0 additions & 3 deletions packages/indexer-cli/src/__tests__/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ export const setup = async () => {
sequelize = await sequelize.sync({ force: true })

const statusEndpoint = 'http://127.0.0.1:8030/graphql'
const indexNodeIDs = ['node_1']
const graphNode = new GraphNode(
logger,
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
statusEndpoint,
indexNodeIDs,
)

const network = await Network.create(
Expand Down Expand Up @@ -148,7 +146,6 @@ export const setup = async () => {
indexerManagementClient = await createIndexerManagementClient({
models,
graphNode,
indexNodeIDs,
logger,
defaults,
multiNetworks,
Expand Down
56 changes: 2 additions & 54 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,12 @@ export class GraphNode {
private queryBaseURL: URL
status: Client
logger: Logger
indexNodeIDs: string[]

constructor(
logger: Logger,
adminEndpoint: string,
queryEndpoint: string,
statusEndpoint: string,
indexNodeIDs: string[],
) {
this.logger = logger.child({ component: 'GraphNode' })
this.status = createClient({
Expand All @@ -97,8 +95,6 @@ export class GraphNode {
}

this.queryBaseURL = new URL(`/subgraphs/id/`, queryEndpoint)

this.indexNodeIDs = indexNodeIDs
}

async connect(): Promise<void> {
Expand Down Expand Up @@ -282,27 +278,20 @@ export class GraphNode {
}
}

async deploy(
name: string,
deployment: SubgraphDeploymentID,
node_id: string,
): Promise<void> {
async deploy(name: string, deployment: SubgraphDeploymentID): Promise<void> {
try {
this.logger.info(`Deploy subgraph deployment`, {
name,
deployment: deployment.display,
node_id,
})
const response = await this.admin.request('subgraph_deploy', {
name,
ipfs_hash: deployment.ipfsHash,
node_id: node_id,
})

this.logger.trace(`Response from 'subgraph_deploy' call`, {
deployment: deployment.display,
name,
node_id,
response,
})

Expand Down Expand Up @@ -354,29 +343,6 @@ export class GraphNode {
}
}

async pause(deployment: SubgraphDeploymentID): Promise<void> {
try {
this.logger.info(`Pause subgraph deployment`, {
deployment: deployment.display,
})
const response = await this.admin.request('subgraph_pause', {
deployment: deployment.ipfsHash,
})
if (response.error) {
throw response.error
}
this.logger.info(`Successfully paused subgraph deployment`, {
deployment: deployment.display,
})
} catch (error) {
const errorCode = IndexerErrorCode.IE027
this.logger.error(INDEXER_ERROR_MESSAGES[errorCode], {
deployment: deployment.display,
error: indexerError(errorCode, error),
})
}
}

async reassign(deployment: SubgraphDeploymentID, node: string): Promise<void> {
try {
this.logger.info(`Reassign subgraph deployment`, {
Expand Down Expand Up @@ -410,26 +376,8 @@ export class GraphNode {

async ensure(name: string, deployment: SubgraphDeploymentID): Promise<void> {
try {
// Randomly assign to unused nodes if they exist,
// otherwise use the node with lowest deployments assigned
const indexNodes = (await this.indexNodes()).filter(
(node: { id: string; deployments: Array<string> }) => {
return node.id && node.id !== 'removed'
},
)
const usedIndexNodeIDs = indexNodes.map((node) => node.id)
const unusedNodes = this.indexNodeIDs.filter(
(nodeID) => !(nodeID in usedIndexNodeIDs),
)

const targetNode = unusedNodes
? unusedNodes[Math.floor(Math.random() * unusedNodes.length)]
: indexNodes.sort((nodeA, nodeB) => {
return nodeA.deployments.length - nodeB.deployments.length
})[0].id
await this.create(name)
await this.deploy(name, deployment, targetNode)
await this.reassign(deployment, targetNode)
await this.deploy(name, deployment)
} catch (error) {
if (!(error instanceof IndexerError)) {
const errorCode = IndexerErrorCode.IE020
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const setup = async () => {
'https://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
'https://test-status-endpoint.xyz',
[],
)

const network = await Network.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ const setupMonitor = async () => {
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
statusEndpoint,
[],
)

const indexerOptions = spec.IndexerOptions.parse({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ export const createTestManagementClient = async (
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
statusEndpoint,
[],
)
const indexNodeIDs = ['node_1']

const networkSpecification = { ...testNetworkSpecification }
networkSpecification.dai.inject = injectDai
Expand Down Expand Up @@ -114,7 +112,6 @@ export const createTestManagementClient = async (
return await createIndexerManagementClient({
models: managementModels,
graphNode,
indexNodeIDs,
logger,
defaults,
multiNetworks,
Expand Down
4 changes: 0 additions & 4 deletions packages/indexer-common/src/indexer-management/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,6 @@ export interface IndexerManagementClientOptions {
logger: Logger
models: IndexerManagementModels
graphNode: GraphNode
// TODO:L2: Do we need this information? The GraphNode class auto-selects nodes based
// on availability.
// Ford: there were some edge cases where the GraphNode was not able to auto handle it on its own
indexNodeIDs: string[]
multiNetworks: MultiNetworks<Network> | undefined
defaults: IndexerManagementDefaults
}
Expand Down
2 changes: 0 additions & 2 deletions packages/indexer-service/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ export default {
'http://fake-graph-node-admin-endpoint',
argv.graphNodeQueryEndpoint,
argv.graphNodeStatusEndpoint,
argv.indexNodeIds,
)

const networkProvider = await Network.provider(
Expand Down Expand Up @@ -459,7 +458,6 @@ export default {
const indexerManagementClient = await createIndexerManagementClient({
models,
graphNode,
indexNodeIDs: ['node_1'], // This is just a dummy since the indexer-service doesn't manage deployments,
logger,
defaults: {
// This is just a dummy, since we're never writing to the management
Expand Down
2 changes: 0 additions & 2 deletions packages/indexer-service/src/server/__tests__/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ const setup = async () => {
'https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-testnet',
deployment: undefined,
})
const indexNodeIDs = ['node_1']
client = await createIndexerManagementClient({
models,
address,
contracts,
indexingStatusResolver,
indexNodeIDs,
deploymentManagementEndpoint: statusEndpoint,
networkSubgraph,
logger,
Expand Down

0 comments on commit 861e840

Please sign in to comment.