diff --git a/src/fisherman_client.rs b/src/fisherman_client.rs index 122f51f6..be6dcec2 100644 --- a/src/fisherman_client.rs +++ b/src/fisherman_client.rs @@ -1,4 +1,4 @@ -use crate::{indexer_client::Attestation, indexer_selection::IndexerQuery, prelude::*}; +use crate::{indexer_client::Attestation, prelude::*}; use async_trait::async_trait; use reqwest; use serde::Deserialize; @@ -19,7 +19,9 @@ pub enum ChallengeOutcome { pub trait FishermanInterface { async fn challenge( &self, - indexer_query: &IndexerQuery, + indexer: &Address, + allocation: &Address, + indexer_query: &str, attestation: &Attestation, ) -> ChallengeOutcome; } @@ -34,10 +36,15 @@ pub struct FishermanClient { impl FishermanInterface for FishermanClient { async fn challenge( &self, - indexer_query: &IndexerQuery, + indexer: &Address, + allocation: &Address, + indexer_query: &str, attestation: &Attestation, ) -> ChallengeOutcome { - match self.send_challenge(&indexer_query, &attestation).await { + match self + .send_challenge(indexer, allocation, indexer_query, attestation) + .await + { Ok(outcome) => outcome, Err(fisherman_challenge_err) => { tracing::error!(%fisherman_challenge_err); @@ -54,7 +61,9 @@ impl FishermanClient { async fn send_challenge( &self, - indexer_query: &IndexerQuery, + indexer: &Address, + allocation: &Address, + indexer_query: &str, attestation: &Attestation, ) -> Result> { let challenge = serde_json::to_string(&json!({ @@ -62,12 +71,12 @@ impl FishermanClient { "id": 0, "method": "challenge", "params": { - "readOperation": &indexer_query.query, - "allocationID": indexer_query.allocation.to_string(), + "readOperation": indexer_query, + "allocationID": allocation.to_string(), "attestation": serde_json::to_value(attestation)?, }, }))?; - tracing::trace!(%challenge); + tracing::trace!(%indexer, %challenge); self.client .post(self.url.clone()) .header("Content-Type", "application/json") diff --git a/src/indexer_client.rs b/src/indexer_client.rs index 1620124c..f147ee41 100644 --- a/src/indexer_client.rs +++ b/src/indexer_client.rs @@ -12,7 +12,7 @@ pub trait IndexerInterface { async fn query_indexer(&self, query: &IndexerQuery) -> Result; } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct IndexerResponse { pub status: u16, pub payload: String, diff --git a/src/indexer_selection/mod.rs b/src/indexer_selection/mod.rs index c435bf2d..07684b66 100644 --- a/src/indexer_selection/mod.rs +++ b/src/indexer_selection/mod.rs @@ -96,7 +96,7 @@ impl From for SelectionError { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum IndexerError { Timeout, NoAttestation, @@ -259,7 +259,7 @@ impl Indexers { pub async fn observe_indexing_behind( &self, context: &mut Context<'_>, - query: &IndexerQuery, + indexing: &Indexing, block_resolver: &BlockResolver, ) { // Get this early to be closer to the time when the query was made so @@ -268,7 +268,7 @@ impl Indexers { let latest = block_resolver.latest_block().map(|b| b.number).unwrap_or(0); let freshness_requirements = freshness_requirements(&mut context.operations, block_resolver).await; - let selection_factors = match self.indexings.get(&query.indexing).await { + let selection_factors = match self.indexings.get(indexing).await { Some(selection_factors) => selection_factors, None => return, }; diff --git a/src/main.rs b/src/main.rs index 344ce26c..d43e5716 100644 --- a/src/main.rs +++ b/src/main.rs @@ -344,18 +344,25 @@ struct SubgraphQueryData { } impl SubgraphQueryData { - fn resolve_subgraph_deployment(&self, subgraph_identifier: &str) -> Option> { - let deployment = if let Ok(subgraph) = subgraph_identifier.parse::() { + fn resolve_subgraph_deployment( + &self, + params: &actix_web::dev::Path, + ) -> Result { + if let Some(id) = params.get("subgraph_id") { + let subgraph = id + .parse::() + .ok() + .ok_or_else(|| id.to_string())?; self.inputs .current_deployments .value_immediate() - .and_then(|map| map.get(&subgraph).cloned())? + .and_then(|map| map.get(&subgraph).cloned()) + .ok_or_else(|| id.to_string()) + } else if let Some(id) = params.get("deployment_id") { + SubgraphDeploymentID::from_ipfs_hash(id).ok_or_else(|| id.to_string()) } else { - SubgraphDeploymentID::from_ipfs_hash(&subgraph_identifier)? - }; - self.subgraph_info - .value_immediate() - .and_then(|map| map.get(&deployment)?.value_immediate()) + Err("".to_string()) + } } } @@ -364,54 +371,91 @@ async fn handle_subgraph_query( payload: web::Json, data: web::Data, ) -> HttpResponse { - let url_params = request.match_info(); - - let subgraph_info = match url_params - .get("subgraph_id") - .and_then(|id| data.resolve_subgraph_deployment(id)) - { - Some(subgraph) => subgraph, - None => { - return graphql_error_response(StatusCode::BAD_REQUEST, "Invalid subgraph identifier") - } - }; - - let t0 = Instant::now(); - let query_id = QueryID::new(); let ray_id = request .headers() .get("cf-ray") .and_then(|value| value.to_str().ok()) .unwrap_or("") .to_string(); - let span = - tracing::info_span!("handle_subgraph_query", %ray_id, %query_id, %subgraph_info.deployment); - let response = handle_subgraph_query_inner(request, payload, data, query_id, subgraph_info) - .instrument(span.clone()) + let variables = payload.variables.as_ref().map(ToString::to_string); + let mut query = Query::new(ray_id, payload.into_inner().query, variables); + // We check that the requested subgraph is valid now, since we don't want to log query info for + // unknown subgraphs requests. + let deployment = match data.resolve_subgraph_deployment(request.match_info()) { + Ok(subgraph) => subgraph, + Err(invalid_subgraph) => { + tracing::info!(%invalid_subgraph); + return graphql_error_response(StatusCode::BAD_REQUEST, "Invalid subgraph identifier"); + } + }; + query.subgraph = data + .subgraph_info + .value_immediate() + .and_then(|map| map.get(&deployment)?.value_immediate()); + if query.subgraph == None { + tracing::info!(%deployment); + return graphql_error_response(StatusCode::NOT_FOUND, "Subgraph not found"); + } + let span = tracing::info_span!( + "handle_subgraph_query", + ray_id = %query.ray_id, + query_id = %query.id, + deployment = %query.subgraph.as_ref().unwrap().deployment, + network = %query.subgraph.as_ref().unwrap().network, + ); + let response = handle_subgraph_query_inner(request, data, &mut query) + .instrument(span) .await; - let response_time = Instant::now() - t0; let (payload, status) = match response { Ok(payload) => { let status = payload.status().to_string(); (payload, status) } - Err((status, msg)) => (graphql_error_response(status, &msg), msg), + Err((status, msg)) => ( + graphql_error_response(status, &msg), + format!("{}: {}", status, msg), + ), }; tracing::info!( - parent: &span, + ray_id = %query.ray_id, + query_id = %query.id, + deployment = %query.subgraph.as_ref().unwrap().deployment, + network = %query.subgraph.as_ref().unwrap().network, + api_key = %query.api_key.as_ref().unwrap().key, + query = %query.query, + variables = %query.variables.as_deref().unwrap_or_default(), + response_time_ms = (Instant::now() - query.start_time).as_millis() as u32, %status, - response_time_ms = response_time.as_millis() as u32, - "client query result", + "Client query result", ); + for (attempt_index, attempt) in query.indexer_attempts.iter().enumerate() { + let status = match &attempt.result { + Ok(response) => response.status.to_string(), + Err(err) => err.to_string(), + }; + tracing::info!( + ray_id = %query.ray_id, + query_id = %query.id, + attempt_index, + indexer = %attempt.indexer, + allocation = %attempt.allocation, + fee = %attempt.score.fee, + utility = *attempt.score.utility, + blocks_behind = attempt.score.blocks_behind, + response_time_ms = attempt.duration.as_millis() as u32, + %status, + rejection = %attempt.rejection.as_deref().unwrap_or_default(), + "Indexer attempt", + ); + } + payload } async fn handle_subgraph_query_inner( request: HttpRequest, - payload: web::Json, data: web::Data, - query_id: QueryID, - subgraph: Ptr, + query: &mut Query, ) -> Result { let query_engine = QueryEngine::new( data.config.clone(), @@ -455,7 +499,8 @@ async fn handle_subgraph_query_inner( with_metric(&METRICS.unauthorized_domain, &[&api_key.key], |c| c.inc()); return Err((StatusCode::OK, "Domain not authorized by API key".into())); } - if !api_key.deployments.is_empty() && !api_key.deployments.contains(&subgraph.deployment) { + let deployment = &query.subgraph.as_ref().unwrap().deployment.clone(); + if !api_key.deployments.is_empty() && !api_key.deployments.contains(&deployment) { with_metric( &METRICS.queries_unauthorized_deployment, &[&api_key.key], @@ -463,52 +508,41 @@ async fn handle_subgraph_query_inner( ); return Err((StatusCode::OK, "Subgraph not authorized by API key".into())); } - - let query = ClientQuery { - id: query_id, - api_key: api_key.clone(), - query: payload.query.clone(), - variables: payload.variables.as_ref().map(ToString::to_string), - subgraph: subgraph.clone(), - }; - let result = match query_engine.execute_query(query).await { - Ok(result) => result, - Err(err) => { - return Err(( - StatusCode::OK, - match err { - QueryEngineError::MalformedQuery => "Invalid query".into(), - QueryEngineError::NoIndexers => { - "No indexers found for subgraph deployment".into() - } - QueryEngineError::NoIndexerSelected => { - "No suitable indexer found for subgraph deployment".into() - } - QueryEngineError::FeesTooHigh(count) => { - format!("No suitable indexer found, {} indexers requesting higher fees for this query", count) - } - QueryEngineError::MissingBlock(_) => { - "Gateway failed to resolve required blocks".into() - } - }, - )) - } - }; + if let Err(err) = query_engine.execute_query(query).await { + return Err(( + StatusCode::OK, + match err { + QueryEngineError::MalformedQuery => "Invalid query".into(), + QueryEngineError::NoIndexers => "No indexers found for subgraph deployment".into(), + QueryEngineError::NoIndexerSelected => { + "No suitable indexer found for subgraph deployment".into() + } + QueryEngineError::FeesTooHigh(count) => { + format!("No suitable indexer found, {} indexers requesting higher fees for this query", count) + } + QueryEngineError::MissingBlock(_) => { + "Gateway failed to resolve required blocks".into() + } + }, + )); + } + let last_attempt = query.indexer_attempts.last().unwrap(); + let response = last_attempt.result.as_ref().unwrap(); if let Ok(hist) = METRICS .query_result_size - .get_metric_with_label_values(&[&result.query.indexing.deployment.ipfs_hash()]) + .get_metric_with_label_values(&[&deployment.ipfs_hash()]) { - hist.observe(result.response.payload.len() as f64); + hist.observe(response.payload.len() as f64); } let _ = data.stats_db.send(stats_db::Msg::AddQuery { api_key, - fee: result.query.score.fee, + fee: last_attempt.score.fee, domain: domain.to_string(), - subgraph: subgraph.deployment.ipfs_hash(), + subgraph: query.subgraph.as_ref().unwrap().deployment.ipfs_hash(), }); Ok(HttpResponseBuilder::new(StatusCode::OK) .insert_header(header::ContentType::json()) - .body(result.response.payload)) + .body(&response.payload)) } pub fn graphql_error_response(status: StatusCode, message: S) -> HttpResponse { diff --git a/src/query_engine/mod.rs b/src/query_engine/mod.rs index 61674ffc..32f65dae 100644 --- a/src/query_engine/mod.rs +++ b/src/query_engine/mod.rs @@ -6,7 +6,7 @@ use crate::{ fisherman_client::*, indexer_client::*, indexer_selection::{ - self, Context, IndexerError, IndexerQuery, IndexerScore, Indexers, SelectionError, + self, Context, IndexerError, IndexerQuery, IndexerScore, Indexers, Receipt, SelectionError, UnresolvedBlock, }, manifest_client::SubgraphInfo, @@ -22,6 +22,7 @@ use prometheus; use serde_json::value::RawValue; use std::{ collections::HashMap, + rc::Rc, sync::{ atomic::{AtomicUsize, Ordering as MemoryOrdering}, Arc, @@ -29,16 +30,45 @@ use std::{ }; use uuid::Uuid; -#[derive(Clone, Debug)] -pub struct ClientQuery { +#[derive(Debug)] +pub struct Query { pub id: QueryID, - pub api_key: Arc, - pub subgraph: Ptr, - pub query: String, - pub variables: Option, + pub ray_id: String, + pub start_time: Instant, + pub query: Rc, + pub variables: Rc>, + pub api_key: Option>, + pub subgraph: Option>, + pub indexer_attempts: Vec, +} + +impl Query { + pub fn new(ray_id: String, query: String, variables: Option) -> Self { + Self { + id: QueryID::new(), + start_time: Instant::now(), + ray_id, + query: Rc::new(query), + variables: Rc::new(variables), + api_key: None, + subgraph: None, + indexer_attempts: Vec::new(), + } + } +} + +#[derive(Clone, Debug)] +pub struct IndexerAttempt { + pub score: IndexerScore, + pub indexer: Address, + pub allocation: Address, + pub query: Arc, + pub receipt: Receipt, + pub result: Result, + pub rejection: Option, + pub duration: Duration, } -#[derive(Clone, Copy)] pub struct QueryID { local_id: u64, } @@ -84,7 +114,7 @@ pub struct QueryResponse { pub response: IndexerResponse, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum QueryEngineError { NoIndexers, NoIndexerSelected, @@ -190,68 +220,58 @@ where } } - pub async fn execute_query( - &self, - query: ClientQuery, - ) -> Result { - tracing::debug!( - deployment = ?query.subgraph.deployment, - network = %query.subgraph.network, + pub async fn execute_query(&self, query: &mut Query) -> Result<(), QueryEngineError> { + let api_key = query.api_key.as_ref().unwrap().key.clone(); + let deployment_id = query.subgraph.as_ref().unwrap().deployment.ipfs_hash(); + let _timer = with_metric( + &METRICS.queries.duration, + &[&deployment_id, &api_key], + |h| h.start_timer(), ); - let api_key = query.api_key.key.clone(); - let query_start = Instant::now(); - let deployment_ipfs = query.subgraph.deployment.ipfs_hash(); - let result = self.execute_deployment_query(query, &deployment_ipfs).await; + let result = self.execute_deployment_query(query, &deployment_id).await; let result_counter = if let Ok(_) = result { &METRICS.queries.ok } else { &METRICS.queries.failed }; - with_metric(result_counter, &[&deployment_ipfs, &api_key], |counter| { - counter.inc() - }); - let query_execution_duration = Instant::now() - query_start; - with_metric( - &METRICS.queries.duration, - &[&deployment_ipfs, &api_key], - |hist| { - hist.observe(query_execution_duration.as_secs_f64()); - }, - ); - tracing::info!(query_execution_duration_ms = query_execution_duration.as_millis() as u32); + with_metric(result_counter, &[&deployment_id, &api_key], |c| c.inc()); result } - #[tracing::instrument(skip(self, query, deployment_ipfs))] + #[tracing::instrument(skip(self, query, deployment_id))] async fn execute_deployment_query( &self, - query: ClientQuery, - deployment_ipfs: &str, - ) -> Result { + query: &mut Query, + deployment_id: &str, + ) -> Result<(), QueryEngineError> { use QueryEngineError::*; + let subgraph = query.subgraph.as_ref().unwrap().clone(); let mut indexers = self .deployment_indexers .value_immediate() - .and_then(|map| map.get(&query.subgraph.deployment).cloned()) + .and_then(|map| map.get(&subgraph.deployment).cloned()) .unwrap_or_default(); tracing::debug!( - deployment = ?query.subgraph.deployment, deployment_indexers = indexers.len(), + deployment = ?subgraph.deployment, deployment_indexers = indexers.len(), ); if indexers.is_empty() { return Err(NoIndexers); } let _execution_timer = with_metric( &METRICS.query_execution_duration, - &[&deployment_ipfs], + &[&deployment_id], |hist| hist.start_timer(), ); - let mut context = - Context::new(&query.query, query.variables.as_deref().unwrap_or_default()) - .map_err(|_| MalformedQuery)?; + let query_body = query.query.clone(); + let query_variables = query.variables.clone(); + let mut context = Context::new(&query_body, query_variables.as_deref().unwrap_or_default()) + .map_err(|_| MalformedQuery)?; + + // let mut context = query.context().ok_or(MalformedQuery)?; let block_resolver = self .block_resolvers - .get(&query.subgraph.network) + .get(&subgraph.network) .ok_or(MissingBlock(UnresolvedBlock::WithNumber(0)))?; let freshness_requirements = Indexers::freshness_requirements(&mut context, block_resolver).await?; @@ -259,7 +279,7 @@ where for retry_count in 0..self.config.indexer_selection_retry_limit { let selection_timer = with_metric( &METRICS.indexer_selection_duration, - &[&deployment_ipfs], + &[&deployment_id], |hist| hist.start_timer(), ); @@ -284,8 +304,8 @@ where .indexers .select_indexer( &self.config.utility, - &query.subgraph.network, - &query.subgraph.deployment, + &subgraph.network, + &subgraph.deployment, &indexers, &mut context, &block_resolver, @@ -306,30 +326,31 @@ where Err(err) => return Err(err.into()), }; Self::log_indexer_score( - indexer_query.indexing.indexer, + &query, + &indexer_query.indexing.indexer, &indexer_query.score, "Selected indexer score", ); match scoring_sample.0 { Some((indexer, Ok(score))) => { - Self::log_indexer_score(indexer, &score, "ISA scoring sample") + Self::log_indexer_score(&query, &indexer, &score, "ISA scoring sample") } - Some((indexer, Err(scoring_err))) => { - tracing::info!(?indexer, ?scoring_err, "ISA scoring sample") + Some((indexer, Err(err))) => { + Self::log_indexer_score_err(&query, &indexer, err, "ISA scoring sample") } _ => (), }; let indexer = indexer_query.indexing.indexer; let result = self .execute_indexer_query( - &query, + query, indexer_query, - deployment_ipfs, + deployment_id, &mut context, block_resolver, - retry_count, ) .await; + assert_eq!(retry_count + 1, query.indexer_attempts.len()); match result { Ok(response) => return Ok(response), Err(RemoveIndexer::No) => (), @@ -343,70 +364,86 @@ where Err(NoIndexerSelected) } - fn log_indexer_score(indexer: Address, score: &IndexerScore, message: &'static str) { + fn log_indexer_score( + query: &Query, + indexer: &Address, + score: &IndexerScore, + message: &'static str, + ) { tracing::info!( - ?indexer, - fee = ?score.fee, - slashable = ?score.slashable, - utility = %score.utility, - economic_security = %score.utility_scores.economic_security, - price_efficiency = %score.utility_scores.price_efficiency, - data_freshness = %score.utility_scores.data_freshness, - performance = %score.utility_scores.performance, - reputation = %score.utility_scores.reputation, - sybil = %score.sybil, - blocks_behind = ?score.blocks_behind, + ray_id = %query.ray_id, + query_id = %query.id, + deployment = %query.subgraph.as_ref().unwrap().deployment, + %indexer, + fee = %score.fee, + slashable = %score.slashable, + utility = *score.utility, + economic_security = score.utility_scores.economic_security, + price_efficiency = score.utility_scores.price_efficiency, + data_freshness = score.utility_scores.data_freshness, + performance = score.utility_scores.performance, + reputation = score.utility_scores.reputation, + sybil = *score.sybil, + blocks_behind = score.blocks_behind, message, ); } + fn log_indexer_score_err( + query: &Query, + indexer: &Address, + scoring_err: SelectionError, + message: &'static str, + ) { + tracing::info!( + ray_id = %query.ray_id, + query_id = %query.id, + deployment = %query.subgraph.as_ref().unwrap().deployment, + %indexer, + ?scoring_err, + message, + ) + } + async fn execute_indexer_query( &self, - query: &ClientQuery, + query: &mut Query, indexer_query: IndexerQuery, - deployment_ipfs: &str, + deployment_id: &str, context: &mut Context<'_>, block_resolver: &BlockResolver, - retry_count: usize, - ) -> Result { + ) -> Result<(), RemoveIndexer> { let indexer_id = indexer_query.indexing.indexer.to_string(); tracing::info!(indexer = %indexer_id); - self.observe_indexer_selection_metrics(&query.subgraph.deployment, &indexer_query); + self.observe_indexer_selection_metrics(deployment_id, &indexer_query); let t0 = Instant::now(); let result = self.indexer_client.query_indexer(&indexer_query).await; let query_duration = Instant::now() - t0; with_metric( &METRICS.indexer_request.duration, - &[&deployment_ipfs, &indexer_id], + &[&deployment_id, &indexer_id], |hist| hist.observe(query_duration.as_secs_f64()), ); - - let indexer_response_status = match &result { - Ok(response) => response.status.to_string(), - Err(err) => err.to_string(), - }; - tracing::info!( - api_key = %query.api_key.key, - indexer = %indexer_query.indexing.indexer, - indexer_url = %indexer_query.score.url, - fee = %indexer_query.score.fee, - blocks_behind = ?indexer_query.score.blocks_behind, - indexer_query_duration_ms = query_duration.as_millis() as u32, - %indexer_response_status, - indexer_query = %indexer_query.query, - %retry_count, - "indexer query result", - ); - - let response = match result { + query.indexer_attempts.push(IndexerAttempt { + score: indexer_query.score, + indexer: indexer_query.indexing.indexer, + allocation: indexer_query.allocation, + query: Arc::new(indexer_query.query), + receipt: indexer_query.receipt, + result: result, + rejection: None, + duration: query_duration, + }); + let result = &query.indexer_attempts.last().unwrap(); + let response = match &result.result { Ok(response) => response, Err(err) => { self.indexers - .observe_failed_query(&indexer_query.indexing, &indexer_query.receipt, err) + .observe_failed_query(&indexer_query.indexing, &result.receipt, err.clone()) .await; with_metric( &METRICS.indexer_request.failed, - &[&deployment_ipfs, &indexer_id], + &[&deployment_id, &indexer_id], |counter| counter.inc(), ); return Err(RemoveIndexer::Yes); @@ -414,16 +451,17 @@ where }; with_metric( &METRICS.indexer_request.ok, - &[&deployment_ipfs, &indexer_id], + &[&deployment_id, &indexer_id], |counter| counter.inc(), ); - if !query.subgraph.features.is_empty() && response.attestation.is_none() { + let subgraph = query.subgraph.as_ref().unwrap(); + if !subgraph.features.is_empty() && response.attestation.is_none() { tracing::info!(indexer_response_err = "Attestable response has no attestation"); self.indexers .observe_failed_query( &indexer_query.indexing, - &indexer_query.receipt, + &result.receipt, IndexerError::NoAttestation, ) .await; @@ -431,63 +469,62 @@ where } if let Err(remove_indexer) = self - .check_unattestable_responses(context, &block_resolver, &indexer_query, &response) + .check_unattestable_responses( + context, + &block_resolver, + &indexer_query.indexing, + &result.receipt, + &response, + ) .await { with_metric( &METRICS.indexer_response_unattestable, - &[&deployment_ipfs, &indexer_id], + &[&deployment_id, &indexer_id], |counter| counter.inc(), ); return Err(remove_indexer); } if let Some(attestation) = &response.attestation { - self.challenge_indexer_response(indexer_query.clone(), attestation.clone()); + self.challenge_indexer_response( + indexer_query.indexing.clone(), + result.allocation.clone(), + result.query.clone(), + attestation.clone(), + ); } self.indexers - .observe_successful_query( - &indexer_query.indexing, - query_duration, - &indexer_query.receipt, - ) + .observe_successful_query(&indexer_query.indexing, query_duration, &result.receipt) .await; - Ok(QueryResponse { - query: indexer_query, - response, - }) + Ok(()) } - fn observe_indexer_selection_metrics( - &self, - deployment: &SubgraphDeploymentID, - selection: &IndexerQuery, - ) { - let deployment = deployment.ipfs_hash(); + fn observe_indexer_selection_metrics(&self, deployment: &str, selection: &IndexerQuery) { let metrics = &METRICS.indexer_selection; if let Ok(hist) = metrics .blocks_behind - .get_metric_with_label_values(&[&deployment]) + .get_metric_with_label_values(&[deployment]) { hist.observe(selection.score.blocks_behind as f64); } - if let Ok(hist) = metrics.fee.get_metric_with_label_values(&[&deployment]) { + if let Ok(hist) = metrics.fee.get_metric_with_label_values(&[deployment]) { hist.observe(selection.score.fee.as_f64()); } if let Ok(counter) = metrics .indexer_selected - .get_metric_with_label_values(&[&deployment, &selection.indexing.indexer.to_string()]) + .get_metric_with_label_values(&[deployment, &selection.indexing.indexer.to_string()]) { counter.inc(); } if let Ok(hist) = metrics .slashable_dollars - .get_metric_with_label_values(&[&deployment]) + .get_metric_with_label_values(&[deployment]) { hist.observe(selection.score.slashable.as_f64()); } - if let Ok(hist) = metrics.utility.get_metric_with_label_values(&[&deployment]) { + if let Ok(hist) = metrics.utility.get_metric_with_label_values(&[deployment]) { hist.observe(*selection.score.utility); } } @@ -496,7 +533,8 @@ where &self, context: &mut Context<'_>, block_resolver: &BlockResolver, - indexer_query: &IndexerQuery, + indexing: &Indexing, + receipt: &Receipt, response: &IndexerResponse, ) -> Result<(), RemoveIndexer> { // Special-casing for a few known indexer errors; the block scope here @@ -511,7 +549,7 @@ where ) { tracing::info!(indexer_response_err = "indexing behind"); self.indexers - .observe_indexing_behind(context, indexer_query, block_resolver) + .observe_indexing_behind(context, indexing, block_resolver) .await; return Err(RemoveIndexer::No); } @@ -519,11 +557,7 @@ where if indexer_response_has_error(&parsed_response, "panic processing query") { tracing::info!(indexer_response_err = "panic processing query"); self.indexers - .observe_failed_query( - &indexer_query.indexing, - &indexer_query.receipt, - IndexerError::NondeterministicResponse, - ) + .observe_failed_query(indexing, receipt, IndexerError::NondeterministicResponse) .await; return Err(RemoveIndexer::Yes); } @@ -531,14 +565,22 @@ where Ok(()) } - fn challenge_indexer_response(&self, indexer_query: IndexerQuery, attestation: Attestation) { + fn challenge_indexer_response( + &self, + indexing: Indexing, + allocation: Address, + indexer_query: Arc, + attestation: Attestation, + ) { let fisherman = match &self.fisherman_client { Some(fisherman) => fisherman.clone(), None => return, }; let indexers = self.indexers.clone(); tokio::spawn(async move { - let outcome = fisherman.challenge(&indexer_query, &attestation).await; + let outcome = fisherman + .challenge(&indexing.indexer, &allocation, &indexer_query, &attestation) + .await; tracing::trace!(?outcome); let penalty = match outcome { ChallengeOutcome::Unknown | ChallengeOutcome::AgreeWithTrustedIndexer => 0, @@ -548,7 +590,7 @@ where }; if penalty > 0 { tracing::info!(?outcome, "penalizing for challenge outcome"); - indexers.penalize(&indexer_query.indexing, penalty).await; + indexers.penalize(&indexing, penalty).await; } }); } diff --git a/src/query_engine/tests.rs b/src/query_engine/tests.rs index 1fb9365a..63b1118a 100644 --- a/src/query_engine/tests.rs +++ b/src/query_engine/tests.rs @@ -155,7 +155,7 @@ impl Topology { Arc::new(resolvers) } - fn gen_query(&mut self) -> ClientQuery { + fn gen_query(&mut self) -> Query { let deployment = self .deployments() .into_iter() @@ -163,18 +163,15 @@ impl Topology { .choose(&mut self.rng) .unwrap() .clone(); - let query = if self.flip_coin(32) { "?" } else { BASIC_QUERY }; - ClientQuery { - id: QueryID::new(), - api_key: Arc::new(APIKey::default()), - query: query.into(), - variables: None, - subgraph: Ptr::new(SubgraphInfo { - deployment: deployment.id, - network: deployment.network, - features: vec![], - }), - } + let query_body = if self.flip_coin(32) { "?" } else { BASIC_QUERY }; + let mut query = Query::new("".into(), query_body.into(), None); + query.api_key = Some(Arc::new(APIKey::default())); + query.subgraph = Some(Ptr::new(SubgraphInfo { + deployment: deployment.id, + network: deployment.network, + features: vec![], + })); + query } fn gen_network(&mut self) -> NetworkTopology { @@ -390,55 +387,27 @@ impl Topology { eventuals::idle().await; } - async fn check_result( + fn check_result( &self, - query: ClientQuery, - result: Result, + query: &Query, + result: Result<(), QueryEngineError>, ) -> Result<(), Vec> { - fn err_with(mut trace: Vec, err: S) -> Result<(), Vec> { - trace.push(err.to_string()); - Err(trace.clone()) - } let mut trace = Vec::new(); + trace.push(format!("result: {:?}", result)); trace.push(format!("{:#?}", query)); - trace.push(format!("{:#?}", result)); - // Return MalformedQuery if query is invalid. - if let Err(QueryEngineError::MalformedQuery) = result { - if query.query == "?" { - return Ok(()); - } - } + + let subgraph = query.subgraph.as_ref().unwrap(); let deployment = self .deployments() .into_iter() - .find(|deployment| &deployment.id == &query.subgraph.deployment) + .find(|deployment| &deployment.id == &subgraph.deployment) .unwrap(); - trace.push(format!("{:#?}", deployment)); let indexers = deployment .indexings .iter() .map(|id| self.indexers.get(id).unwrap()) .collect::>(); - // Return NoIndexers if no indexers exist for this deployment. - if indexers.is_empty() { - if let Err(QueryEngineError::NoIndexers) = result { - return Ok(()); - } - return err_with(trace, format!("expected NoIndexers, got {:#?}", result)); - } - // Return MissingBlock if the network has no blocks. - if self - .networks - .get(&query.subgraph.network) - .unwrap() - .blocks - .is_empty() - { - if let Err(QueryEngineError::MissingBlock(_)) = result { - return Ok(()); - } - return err_with(trace, format!("expected MissingBlock, got {:?}", result)); - } + // Valid indexers have the following properties: fn valid_indexer(indexer: &IndexerTopology) -> bool { // no failure to indexing the subgraph @@ -456,53 +425,146 @@ impl Topology { .filter(|indexer| valid_indexer(indexer)) .collect::>(); trace.push(format!("valid indexers: {:#?}", valid)); - if valid.is_empty() { - let high_fee_count = indexers - .iter() - .filter(|indexer| indexer.fee > TokenAmount::Enough) - .count(); - // Return FeesTooHigh if no valid indexers, due to high fees. - if high_fee_count > 0 { - return match result { - Err(QueryEngineError::FeesTooHigh(count)) if count == high_fee_count => Ok(()), - _ => err_with( - trace, - format!( - "expected FeesTooHigh({}), got {:#?}", - high_fee_count, result - ), - ), - }; - } - // Return NoIndexerSelected if no valid indexers were found. - if let Err(QueryEngineError::NoIndexerSelected) = result { - return Ok(()); + + if query.indexer_attempts.is_empty() { + return self + .check_no_attempts(&mut trace, query, &result, &subgraph, &indexers, &valid); + } + + let mut failed_attempts = query.indexer_attempts.clone(); + let last_attempt = failed_attempts.pop(); + let success_check = + last_attempt + .as_ref() + .and_then(|attempt| match (&attempt.result, &attempt.rejection) { + (Ok(_), None) => { + Some(self.check_successful_attempt(&mut trace, &result, &valid, &attempt)) + } + _ => None, + }); + match last_attempt { + Some(attempt) if success_check.is_none() => failed_attempts.push(attempt), + _ => (), + }; + for attempt in &failed_attempts { + if let Err(trace) = self.check_failed_attempt(&mut trace, &indexers, &valid, &attempt) { + return Err(trace); } - return err_with( + } + match success_check { + Some(result) => result, + None => Ok(()), + } + } + + fn check_no_attempts( + &self, + trace: &mut Vec, + query: &Query, + result: &Result<(), QueryEngineError>, + subgraph: &SubgraphInfo, + indexers: &[&IndexerTopology], + valid: &[&IndexerTopology], + ) -> Result<(), Vec> { + use QueryEngineError::*; + if indexers.is_empty() { + return Self::expect_err(trace, result, NoIndexers); + } + if query.query.as_ref() == "?" { + return Self::expect_err(trace, result, MalformedQuery); + } + if self + .networks + .get(&subgraph.network) + .unwrap() + .blocks + .is_empty() + { + return Self::expect_err(trace, result, MissingBlock(UnresolvedBlock::WithNumber(0))); + } + + if !valid.is_empty() { + return Self::err_with( trace, - format!("expected NoIndexerSelected, got {:#?}", result), + format!("expected no valid indexer, got {}", valid.len()), ); } - // A valid indexer implies that a response is returned. - let response = match result { - Ok(response) => response, - Err(err) => return err_with(trace, format!("expected response, got {:?}", err)), - }; - // The test resolver only gives the following response for successful queries. - if !response.response.payload.contains("success") { - return err_with( + + let high_fee_count = indexers + .iter() + .filter(|indexer| indexer.fee > TokenAmount::Enough) + .count(); + if high_fee_count > 0 { + return Self::expect_err(trace, result, QueryEngineError::FeesTooHigh(high_fee_count)); + } + Self::expect_err(trace, result, NoIndexerSelected) + } + + fn check_failed_attempt( + &self, + trace: &mut Vec, + indexers: &[&IndexerTopology], + valid: &[&IndexerTopology], + attempt: &IndexerAttempt, + ) -> Result<(), Vec> { + if let (Ok(_), None) = (&attempt.result, &attempt.rejection) { + return Self::err_with( trace, - format!("expected success, got {:#?}", response.response), + format!("expected indexer query error, got: {:#?}", attempt), ); } - // The response is from a valid indexer. - if let Some(_) = valid - .into_iter() - .find(|indexer| response.query.indexing.indexer == indexer.id) - { - return Ok(()); + if !indexers.iter().any(|indexer| indexer.id == attempt.indexer) { + return Self::err_with( + trace, + format!("attempted indexer not available: {:?}", attempt.indexer), + ); } - err_with(trace, "response did not match any valid indexer") + if valid.iter().any(|indexer| indexer.id == attempt.indexer) { + return Self::err_with( + trace, + format!("expected invalid indexer attempt, got {:#?}", attempt), + ); + } + Ok(()) + } + + fn check_successful_attempt( + &self, + trace: &mut Vec, + result: &Result<(), QueryEngineError>, + valid: &[&IndexerTopology], + attempt: &IndexerAttempt, + ) -> Result<(), Vec> { + if let Err(err) = result { + return Self::err_with(trace, format!("expected success, got {:?}", err)); + } + let response = match &attempt.result { + Ok(response) => response, + Err(err) => return Self::err_with(trace, format!("expected response, got {:?}", err)), + }; + if !response.payload.contains("success") { + return Self::err_with(trace, format!("expected success, got {}", response.payload)); + } + if !valid.iter().any(|indexer| attempt.indexer == indexer.id) { + return Self::err_with(trace, "response did not match any valid indexer"); + } + Ok(()) + } + + fn expect_err( + trace: &mut Vec, + result: &Result<(), QueryEngineError>, + err: QueryEngineError, + ) -> Result<(), Vec> { + match result { + Err(e) if e == &err => Ok(()), + _ => Self::err_with(trace, format!("expected {:?}, got {:?}", err, result)), + } + } + + fn err_with(trace: &mut Vec, err: S) -> Result<(), Vec> { + trace.push(err.to_string()); + Err(trace.clone()) } } @@ -555,12 +617,18 @@ struct TopologyFisherman { #[async_trait] impl FishermanInterface for TopologyFisherman { - async fn challenge(&self, indexer_query: &IndexerQuery, _: &Attestation) -> ChallengeOutcome { + async fn challenge( + &self, + indexer: &Address, + _: &Address, + _: &str, + _: &Attestation, + ) -> ChallengeOutcome { self.topology .lock() .await .indexers - .get(&indexer_query.indexing.indexer) + .get(indexer) .unwrap() .challenge_outcome } @@ -617,9 +685,9 @@ async fn test() { ); topology.lock().await.write_inputs().await; for _ in 0..100 { - let query = topology.lock().await.gen_query(); - let result = query_engine.execute_query(query.clone()).await; - let trace = match topology.lock().await.check_result(query, result).await { + let mut query = topology.lock().await.gen_query(); + let result = query_engine.execute_query(&mut query).await; + let trace = match topology.lock().await.check_result(&query, result) { Err(trace) => trace, Ok(()) => continue, };