Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/fisherman_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{indexer_client::Attestation, indexer_selection::IndexerQuery, prelude::*};
use crate::{indexer_client::Attestation, prelude::*};
use async_trait::async_trait;
use reqwest;
use serde::Deserialize;
Expand All @@ -19,7 +19,9 @@ pub enum ChallengeOutcome {
pub trait FishermanInterface {
async fn challenge(
&self,
indexer_query: &IndexerQuery,
indexer: &Address,
allocation: &Address,
indexer_query: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is indexer_query different from plain query?

attestation: &Attestation,
) -> ChallengeOutcome;
}
Expand All @@ -34,10 +36,15 @@ pub struct FishermanClient {
impl FishermanInterface for FishermanClient {
async fn challenge(
&self,
indexer_query: &IndexerQuery,
indexer: &Address,
allocation: &Address,
indexer_query: &str,
attestation: &Attestation,
) -> ChallengeOutcome {
match self.send_challenge(&indexer_query, &attestation).await {
match self
.send_challenge(indexer, allocation, indexer_query, attestation)
.await
{
Ok(outcome) => outcome,
Err(fisherman_challenge_err) => {
tracing::error!(%fisherman_challenge_err);
Expand All @@ -54,20 +61,22 @@ impl FishermanClient {

async fn send_challenge(
&self,
indexer_query: &IndexerQuery,
indexer: &Address,
allocation: &Address,
indexer_query: &str,
attestation: &Attestation,
) -> Result<ChallengeOutcome, Box<dyn Error>> {
let challenge = serde_json::to_string(&json!({
"jsonrpc": "2.0",
"id": 0,
"method": "challenge",
"params": {
"readOperation": &indexer_query.query,
"allocationID": indexer_query.allocation.to_string(),
"readOperation": indexer_query,
"allocationID": allocation.to_string(),
"attestation": serde_json::to_value(attestation)?,
},
}))?;
tracing::trace!(%challenge);
tracing::trace!(%indexer, %challenge);
self.client
.post(self.url.clone())
.header("Content-Type", "application/json")
Expand Down
2 changes: 1 addition & 1 deletion src/indexer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub trait IndexerInterface {
async fn query_indexer(&self, query: &IndexerQuery) -> Result<IndexerResponse, IndexerError>;
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct IndexerResponse {
pub status: u16,
pub payload: String,
Expand Down
6 changes: 3 additions & 3 deletions src/indexer_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl From<BadIndexerReason> for SelectionError {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum IndexerError {
Timeout,
NoAttestation,
Expand Down Expand Up @@ -259,7 +259,7 @@ impl Indexers {
pub async fn observe_indexing_behind(
&self,
context: &mut Context<'_>,
query: &IndexerQuery,
indexing: &Indexing,
block_resolver: &BlockResolver,
) {
// Get this early to be closer to the time when the query was made so
Expand All @@ -268,7 +268,7 @@ impl Indexers {
let latest = block_resolver.latest_block().map(|b| b.number).unwrap_or(0);
let freshness_requirements =
freshness_requirements(&mut context.operations, block_resolver).await;
let selection_factors = match self.indexings.get(&query.indexing).await {
let selection_factors = match self.indexings.get(indexing).await {
Some(selection_factors) => selection_factors,
None => return,
};
Expand Down
176 changes: 105 additions & 71 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,25 @@ struct SubgraphQueryData {
}

impl SubgraphQueryData {
fn resolve_subgraph_deployment(&self, subgraph_identifier: &str) -> Option<Ptr<SubgraphInfo>> {
let deployment = if let Ok(subgraph) = subgraph_identifier.parse::<SubgraphID>() {
fn resolve_subgraph_deployment(
&self,
params: &actix_web::dev::Path<actix_web::dev::Url>,
) -> Result<SubgraphDeploymentID, String> {
if let Some(id) = params.get("subgraph_id") {
let subgraph = id
.parse::<SubgraphID>()
.ok()
.ok_or_else(|| id.to_string())?;
self.inputs
.current_deployments
.value_immediate()
.and_then(|map| map.get(&subgraph).cloned())?
.and_then(|map| map.get(&subgraph).cloned())
.ok_or_else(|| id.to_string())
} else if let Some(id) = params.get("deployment_id") {
SubgraphDeploymentID::from_ipfs_hash(id).ok_or_else(|| id.to_string())
} else {
SubgraphDeploymentID::from_ipfs_hash(&subgraph_identifier)?
};
self.subgraph_info
.value_immediate()
.and_then(|map| map.get(&deployment)?.value_immediate())
Err("".to_string())
}
}
}

Expand All @@ -364,54 +371,91 @@ async fn handle_subgraph_query(
payload: web::Json<QueryBody>,
data: web::Data<SubgraphQueryData>,
) -> HttpResponse {
let url_params = request.match_info();

let subgraph_info = match url_params
.get("subgraph_id")
.and_then(|id| data.resolve_subgraph_deployment(id))
{
Some(subgraph) => subgraph,
None => {
return graphql_error_response(StatusCode::BAD_REQUEST, "Invalid subgraph identifier")
}
};

let t0 = Instant::now();
let query_id = QueryID::new();
let ray_id = request
.headers()
.get("cf-ray")
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_string();
let span =
tracing::info_span!("handle_subgraph_query", %ray_id, %query_id, %subgraph_info.deployment);
let response = handle_subgraph_query_inner(request, payload, data, query_id, subgraph_info)
.instrument(span.clone())
let variables = payload.variables.as_ref().map(ToString::to_string);
let mut query = Query::new(ray_id, payload.into_inner().query, variables);
// We check that the requested subgraph is valid now, since we don't want to log query info for
// unknown subgraphs requests.
let deployment = match data.resolve_subgraph_deployment(request.match_info()) {
Ok(subgraph) => subgraph,
Err(invalid_subgraph) => {
tracing::info!(%invalid_subgraph);
return graphql_error_response(StatusCode::BAD_REQUEST, "Invalid subgraph identifier");
}
};
query.subgraph = data
.subgraph_info
.value_immediate()
.and_then(|map| map.get(&deployment)?.value_immediate());
if query.subgraph == None {
tracing::info!(%deployment);
return graphql_error_response(StatusCode::NOT_FOUND, "Subgraph not found");
}
let span = tracing::info_span!(
"handle_subgraph_query",
ray_id = %query.ray_id,
query_id = %query.id,
deployment = %query.subgraph.as_ref().unwrap().deployment,
network = %query.subgraph.as_ref().unwrap().network,
);
let response = handle_subgraph_query_inner(request, data, &mut query)
.instrument(span)
.await;
let response_time = Instant::now() - t0;
let (payload, status) = match response {
Ok(payload) => {
let status = payload.status().to_string();
(payload, status)
}
Err((status, msg)) => (graphql_error_response(status, &msg), msg),
Err((status, msg)) => (
graphql_error_response(status, &msg),
format!("{}: {}", status, msg),
),
};
tracing::info!(
parent: &span,
ray_id = %query.ray_id,
query_id = %query.id,
deployment = %query.subgraph.as_ref().unwrap().deployment,
network = %query.subgraph.as_ref().unwrap().network,
api_key = %query.api_key.as_ref().unwrap().key,
query = %query.query,
variables = %query.variables.as_deref().unwrap_or_default(),
response_time_ms = (Instant::now() - query.start_time).as_millis() as u32,
%status,
response_time_ms = response_time.as_millis() as u32,
"client query result",
"Client query result",
);
for (attempt_index, attempt) in query.indexer_attempts.iter().enumerate() {
let status = match &attempt.result {
Ok(response) => response.status.to_string(),
Err(err) => err.to_string(),
};
tracing::info!(
ray_id = %query.ray_id,
query_id = %query.id,
attempt_index,
indexer = %attempt.indexer,
allocation = %attempt.allocation,
fee = %attempt.score.fee,
utility = *attempt.score.utility,
blocks_behind = attempt.score.blocks_behind,
response_time_ms = attempt.duration.as_millis() as u32,
%status,
rejection = %attempt.rejection.as_deref().unwrap_or_default(),
"Indexer attempt",
);
}

payload
}

async fn handle_subgraph_query_inner(
request: HttpRequest,
payload: web::Json<QueryBody>,
data: web::Data<SubgraphQueryData>,
query_id: QueryID,
subgraph: Ptr<SubgraphInfo>,
query: &mut Query,
) -> Result<HttpResponse, (StatusCode, String)> {
let query_engine = QueryEngine::new(
data.config.clone(),
Expand Down Expand Up @@ -455,60 +499,50 @@ async fn handle_subgraph_query_inner(
with_metric(&METRICS.unauthorized_domain, &[&api_key.key], |c| c.inc());
return Err((StatusCode::OK, "Domain not authorized by API key".into()));
}
if !api_key.deployments.is_empty() && !api_key.deployments.contains(&subgraph.deployment) {
let deployment = &query.subgraph.as_ref().unwrap().deployment.clone();
if !api_key.deployments.is_empty() && !api_key.deployments.contains(&deployment) {
with_metric(
&METRICS.queries_unauthorized_deployment,
&[&api_key.key],
|counter| counter.inc(),
);
return Err((StatusCode::OK, "Subgraph not authorized by API key".into()));
}

let query = ClientQuery {
id: query_id,
api_key: api_key.clone(),
query: payload.query.clone(),
variables: payload.variables.as_ref().map(ToString::to_string),
subgraph: subgraph.clone(),
};
let result = match query_engine.execute_query(query).await {
Ok(result) => result,
Err(err) => {
return Err((
StatusCode::OK,
match err {
QueryEngineError::MalformedQuery => "Invalid query".into(),
QueryEngineError::NoIndexers => {
"No indexers found for subgraph deployment".into()
}
QueryEngineError::NoIndexerSelected => {
"No suitable indexer found for subgraph deployment".into()
}
QueryEngineError::FeesTooHigh(count) => {
format!("No suitable indexer found, {} indexers requesting higher fees for this query", count)
}
QueryEngineError::MissingBlock(_) => {
"Gateway failed to resolve required blocks".into()
}
},
))
}
};
if let Err(err) = query_engine.execute_query(query).await {
return Err((
StatusCode::OK,
match err {
QueryEngineError::MalformedQuery => "Invalid query".into(),
QueryEngineError::NoIndexers => "No indexers found for subgraph deployment".into(),
QueryEngineError::NoIndexerSelected => {
"No suitable indexer found for subgraph deployment".into()
}
QueryEngineError::FeesTooHigh(count) => {
format!("No suitable indexer found, {} indexers requesting higher fees for this query", count)
}
QueryEngineError::MissingBlock(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sake of logging these can be status codes I think.

"Gateway failed to resolve required blocks".into()
}
},
));
}
let last_attempt = query.indexer_attempts.last().unwrap();
let response = last_attempt.result.as_ref().unwrap();
if let Ok(hist) = METRICS
.query_result_size
.get_metric_with_label_values(&[&result.query.indexing.deployment.ipfs_hash()])
.get_metric_with_label_values(&[&deployment.ipfs_hash()])
{
hist.observe(result.response.payload.len() as f64);
hist.observe(response.payload.len() as f64);
}
let _ = data.stats_db.send(stats_db::Msg::AddQuery {
api_key,
fee: result.query.score.fee,
fee: last_attempt.score.fee,
domain: domain.to_string(),
subgraph: subgraph.deployment.ipfs_hash(),
subgraph: query.subgraph.as_ref().unwrap().deployment.ipfs_hash(),
});
Ok(HttpResponseBuilder::new(StatusCode::OK)
.insert_header(header::ContentType::json())
.body(result.response.payload))
.body(&response.payload))
}

pub fn graphql_error_response<S: ToString>(status: StatusCode, message: S) -> HttpResponse {
Expand Down
Loading