Skip to content

Commit

Permalink
feat: Paid query flow (partial impl)
Browse files Browse the repository at this point in the history
  • Loading branch information
aasseman committed Aug 11, 2023
1 parent f8b59c1 commit 20aa6f9
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 50 deletions.
189 changes: 152 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions native/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use eip_712_derive::{
use secp256k1::SecretKey;
use std::convert::TryInto;

#[derive(Debug, Clone)]
pub struct AttestationSigner {
subgraph_deployment_id: Bytes32,
domain_separator: DomainSeparator,
Expand Down
1 change: 1 addition & 0 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ metrics-exporter-prometheus = "0.11.0"
prometheus = "0.13.3"
hex = "0.4.3"
bs58 = "0.5.0"
tap_core = "0.3.0"

# [[bin]]
# name = "indexer-native"
Expand Down
84 changes: 84 additions & 0 deletions service/src/common/allocation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use ethers::signers::coins_bip39::English;
use ethers::signers::MnemonicBuilder;
use ethers::signers::Signer;
use ethers::signers::Wallet;
use ethers_core::k256::ecdsa::SigningKey;
use ethers_core::types::Address;

use crate::query_processor::SubgraphDeploymentID;

// TODO: Lots of fields are commented out for now, but will be needed later.

pub struct Allocation {
pub id: Address,
// pub status: AllocationStatus,
pub subgraph_deployment: SubgraphDeploymentID,
// pub indexer: Address,
// pub allocated_tokens: U256,
pub created_at_epoch: u64,
// pub created_at_block_hash: String,
// pub closed_at_epoch: u64,
// pub closed_at_epoch_start_block_hash: Option<String>,
// pub previous_epoch_start_block_hash: Option<String>,
// pub closed_at_block_hash: String,
// pub poi: Option<String>,
// pub query_fee_rebates: U256,
// pub query_fees_collected: U256,
}

// pub enum AllocationStatus {
// Null,
// Active,
// Closed,
// Finalized,
// Claimed,
// }

pub fn derive_key_pair(
indexer_mnemonic: &str,
epoch: u64,
deployment: &SubgraphDeploymentID,
index: u64,
) -> Wallet<SigningKey> {
let mut derivation_path = format!("m/{}/", epoch);
derivation_path.push_str(
&deployment
.ipfs_hash()
.as_bytes()
.iter()
.map(|char| char.to_string())
.collect::<Vec<String>>()
.join("/"),
);
derivation_path.push_str(format!("/{}", index).as_str());

MnemonicBuilder::<English>::default()
.derivation_path(&derivation_path)
.expect("Valid derivation path")
.phrase(indexer_mnemonic)
.build()
.unwrap()
}

pub fn allocation_signer(indexer_mnemonic: &str, allocation: Allocation) -> Wallet<SigningKey> {
// Guess the allocation index by enumerating all indexes in the
// range [0, 100] and checking for a match
for i in 0..100 {
// The allocation was either created at the epoch it intended to or one
// epoch later. So try both both.
for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] {
let allocation_wallet = derive_key_pair(
indexer_mnemonic,
created_at_epoch,
&allocation.subgraph_deployment,
i,
);
if allocation_wallet.address() == allocation.id {
return allocation_wallet;
}
}
}
panic!("Could not find allocation signer")
}

// TODO: tests!
1 change: 1 addition & 0 deletions service/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub mod address;
pub mod allocation;
pub mod database;
pub mod indexer_error;
// pub mod query_fee_models;
Expand Down
2 changes: 1 addition & 1 deletion service/src/graph_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl GraphNodeInstance {
let request = self
.client
.post(format!("{}/subgraphs/id/{}", self.base_url, endpoint))
.body(data.clone())
.body(data)
.header(header::CONTENT_TYPE, "application/json");

let response = request.send().await?;
Expand Down
72 changes: 65 additions & 7 deletions service/src/query_processor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;

use ethers_core::types::Address;
use ethers_core::types::{Signature, U256};
use log::error;
use native::attestation::AttestationSigner;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tap_core::tap_manager::SignedReceipt;

use crate::graph_node::GraphNodeInstance;

Expand Down Expand Up @@ -92,13 +98,6 @@ impl ToString for SubgraphDeploymentID {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Signature {
v: i64,
r: String,
s: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResult {
#[serde(rename = "graphQLResponse")]
Expand Down Expand Up @@ -128,6 +127,13 @@ pub struct FreeQuery {
pub query: String,
}

/// Paid query needs subgraph_deployment_id, query, receipt
pub struct PaidQuery {
pub subgraph_deployment_id: SubgraphDeploymentID,
pub query: String,
pub receipt: String,
}

#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error(transparent)]
Expand All @@ -146,6 +152,7 @@ pub struct QueryProcessor {
base: Url,
graph_node: GraphNodeInstance,
network_subgraph: Url,
signers: HashMap<Address, AttestationSigner>,
}

impl QueryProcessor {
Expand All @@ -158,6 +165,8 @@ impl QueryProcessor {
graph_node,
network_subgraph: Url::parse(network_subgraph_endpoint)
.expect("Could not parse graph node endpoint"),
// TODO: populate signers
signers: HashMap::new(),
}
}

Expand Down Expand Up @@ -190,6 +199,55 @@ impl QueryProcessor {
status: 200,
})
}

pub async fn execute_paid_query(
&self,
query: PaidQuery,
) -> Result<Response<QueryResult>, QueryError> {
let PaidQuery {
subgraph_deployment_id,
query,
receipt,
} = query;

// TODO: Emit IndexerErrorCode::IE031 on error
let parsed_receipt: SignedReceipt = serde_json::from_str(&receipt)
.map_err(|e| QueryError::Other(anyhow::Error::from(e)))?;

let allocation_id = parsed_receipt.message.allocation_id;

// TODO: Handle the TAP receipt

let signer = self.signers.get(&allocation_id).ok_or_else(|| {
QueryError::Other(anyhow::anyhow!(
"No signer found for allocation id {}",
allocation_id
))
})?;

let response = self
.graph_node
.subgraph_query(&subgraph_deployment_id.ipfs_hash(), query.clone())
.await?;

let attestation_signature = response.attestable.then(|| {
// TODO: Need to check correctness of this. In particular, the endianness.
let attestation = signer.create_attestation(&query, &response.graphql_response);
Signature {
r: U256::from_big_endian(&attestation.r),
s: U256::from_big_endian(&attestation.s),
v: attestation.v as u64,
}
});

Ok(Response {
result: QueryResult {
graphql_response: response.graphql_response,
attestation: attestation_signature,
},
status: 200,
})
}
}

#[cfg(test)]
Expand Down
33 changes: 28 additions & 5 deletions service/src/server/routes/subgraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ pub async fn subgraph_queries(
id: axum::extract::Path<String>,
req: Request<axum::body::Body>,
) -> impl IntoResponse {
let (parts, body) = req.into_parts();

// Extract scalar receipt from header and free query auth token for paid or free query
let receipt = if let Some(recipt) = req.headers().get("scalar-receipt") {
match recipt.to_str() {
let receipt = if let Some(receipt) = parts.headers.get("scalar-receipt") {
match receipt.to_str() {
Ok(r) => Some(r),
Err(_) => {
return bad_request_response("Bad scalar receipt for subgraph query");
Expand All @@ -43,16 +45,16 @@ pub async fn subgraph_queries(
);

// Extract free query auth token
let auth_token = req
.headers()
let auth_token = parts
.headers
.get(http::header::AUTHORIZATION)
.and_then(|t| t.to_str().ok());
// determine if the query is paid or authenticated to be free
let free = auth_token.is_some()
&& server.free_query_auth_token.is_some()
&& auth_token.unwrap() == server.free_query_auth_token.as_deref().unwrap();

let query_string = match response_body_to_query_string(req.into_body()).await {
let query_string = match response_body_to_query_string(body).await {
Ok(q) => q,
Err(e) => return bad_request_response(&e.to_string()),
};
Expand All @@ -68,17 +70,38 @@ pub async fn subgraph_queries(
subgraph_deployment_id,
query: query_string,
};

// TODO: Emit IndexerErrorCode::IE033 on error
let res = server
.query_processor
.execute_free_query(free_query)
.await
.expect("Failed to execute free query");

match res.status {
200 => (StatusCode::OK, Json(res.result)).into_response(),
_ => bad_request_response("Bad response from Graph node"),
}
} else if receipt.is_some() {
let paid_query = crate::query_processor::PaidQuery {
subgraph_deployment_id,
query: query_string,
receipt: receipt.unwrap().to_string(),
};

// TODO: Emit IndexerErrorCode::IE032 on error
let res = server
.query_processor
.execute_paid_query(paid_query)
.await
.expect("Failed to execute paid query");

match res.status {
200 => (StatusCode::OK, Json(res.result)).into_response(),
_ => bad_request_response("Bad response from Graph node"),
}
} else {
// TODO: emit IndexerErrorCode::IE030 on missing receipt
let error_body = "Query request header missing scalar-receipts or incorrect auth token";
bad_request_response(error_body)
}
Expand Down

0 comments on commit 20aa6f9

Please sign in to comment.