Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use graph_gateway::{
reports,
reports::KafkaClient,
subgraph_client, subgraph_studio, subscriptions_subgraph,
topology::{Deployment, GraphNetwork, Indexer},
topology::{Deployment, GraphNetwork},
vouchers, JsonResponse,
};
use indexer_selection::{
Expand Down Expand Up @@ -428,13 +428,7 @@ async fn write_indexer_inputs(
.into_iter()
.flat_map(|deployment| &deployment.indexers)
.filter(|indexer| indexer.id == indexing.indexer)
.map(
|Indexer {
largest_allocation,
allocated_tokens,
..
}| (*largest_allocation, *allocated_tokens),
)
.map(|indexer| (indexer.largest_allocation, indexer.allocated_tokens))
.collect();

receipt_pools
Expand Down
75 changes: 58 additions & 17 deletions graph-gateway/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use chrono::Utc;
use futures_util::future::join_all;
use itertools::Itertools;
use serde::Deserialize;

use prelude::{anyhow::anyhow, eventuals::EventualExt as _, tokio::sync::RwLock, *};
Expand All @@ -16,6 +17,7 @@ use crate::{ipfs, network_subgraph};
pub struct GraphNetwork {
pub subgraphs: Eventual<Ptr<HashMap<SubgraphId, Subgraph>>>,
pub deployments: Eventual<Ptr<HashMap<DeploymentId, Arc<Deployment>>>>,
pub indexers: Eventual<Ptr<HashMap<Address, Arc<Indexer>>>>,
}

/// In an effort to keep the ownership structure a simple tree, this only contains the info required
Expand All @@ -36,14 +38,20 @@ pub struct Deployment {
pub version: Option<Arc<semver::Version>>,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical
/// allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: Vec<Indexer>,
pub indexers: Vec<Arc<Indexer>>,
/// A deployment may be associated with multiple subgraphs.
pub subgraphs: BTreeSet<SubgraphId>,
/// Indicates that the deployment should not be served directly by this gateway. This will
/// always be false when `allocations > 0`.
pub transferred_to_l2: bool,
}

pub struct Allocation {
pub id: Address,
pub allocated_tokens: GRT,
pub indexer: Arc<Indexer>,
}

pub struct Indexer {
pub id: Address,
pub url: Url,
Expand All @@ -52,6 +60,20 @@ pub struct Indexer {
pub allocated_tokens: GRT,
}

impl Indexer {
pub fn cost_url(&self) -> Url {
// Indexer URLs are validated when they are added to the network, so this should never fail.
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
self.url.join("cost").unwrap().into()
}

pub fn status_url(&self) -> Url {
// Indexer URLs are validated when they are added to the network, so this should never fail.
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
self.url.join("status").unwrap().into()
}
}

pub struct Manifest {
pub network: String,
pub features: Vec<String>,
Expand All @@ -70,9 +92,13 @@ impl GraphNetwork {
metadata: HashMap::new(),
})));

// Create a lookup table for subgraphs, keyed by their ID.
// Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let subgraphs = subgraphs.map(move |subgraphs| async move {
Ptr::new(Self::subgraphs(&subgraphs, cache, l2_transfer_delay).await)
});

// Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash).
let deployments = subgraphs.clone().map(|subgraphs| async move {
subgraphs
.values()
Expand All @@ -82,14 +108,26 @@ impl GraphNetwork {
.into()
});

// Create a lookup table for indexers, keyed by their ID (which is also their address).
let indexers = subgraphs.clone().map(|subgraphs| async move {
subgraphs
.values()
.flat_map(|subgraph| &subgraph.deployments)
.flat_map(|deployment| &deployment.indexers)
.map(|indexer| (indexer.id, indexer.clone()))
.collect::<HashMap<Address, Arc<Indexer>>>()
.into()
});

// Return only after eventuals have values, to avoid serving client queries prematurely.
if deployments.value().await.is_err() {
if deployments.value().await.is_err() || indexers.value().await.is_err() {
panic!("Failed to await Graph network topology");
}

Self {
subgraphs,
deployments,
indexers,
}
}

Expand Down Expand Up @@ -152,26 +190,28 @@ impl GraphNetwork {
.collect();

// extract indexer info from each allocation
let allocations: Vec<Indexer> = version
let indexers = version
.subgraph_deployment
.allocations
.iter()
.filter_map(|allocation| {
Some(Indexer {
id: allocation.indexer.id,
url: allocation.indexer.url.as_ref()?.parse().ok()?,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
largest_allocation: allocation.id,
allocated_tokens: allocation.allocated_tokens.change_precision(),
})
// If indexer URL parsing fails, the allocation is ignored (filtered out).
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let url = allocation.indexer.url.as_ref()?.parse().ok()?;

let id = allocation.indexer.id;
Some((
id,
Indexer {
id,
url,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
largest_allocation: allocation.id,
allocated_tokens: allocation.allocated_tokens.change_precision(),
},
))
})
.collect();
// TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552
use itertools::Itertools as _;
let indexers: Vec<Indexer> = allocations
.into_iter()
.map(|indexer| (*indexer.id, indexer))
.into_group_map()
.into_group_map() // TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552
.into_iter()
.filter_map(|(_, mut allocations)| {
let total_allocation: GRT = allocations.iter().map(|a| a.allocated_tokens).sum();
Expand All @@ -180,6 +220,7 @@ impl GraphNetwork {
indexer.allocated_tokens = total_allocation;
Some(indexer)
})
.map(Arc::new)
.collect();

let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2
Expand Down