Skip to content

Commit

Permalink
it runs !
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Mar 15, 2024
1 parent ef09ce2 commit 1e0ded0
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 44 deletions.
6 changes: 5 additions & 1 deletion core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use graph::{
blockchain::{Blockchain, TriggersAdapter},
cheap_clone::CheapClone,
components::{
store::{DeploymentLocator, SubgraphFork, WritableStore},
store::{DeploymentLocator, SubgraphFork, SubgraphStore, WritableStore},
subgraph::ProofOfIndexingVersion,
},
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
Expand All @@ -26,6 +27,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub static_filters: bool,
pub poi_version: ProofOfIndexingVersion,
pub network: String,
pub subgraph_store: Arc<dyn SubgraphStore>,

/// Whether to instrument trigger processing and log additional,
/// possibly expensive and noisy, information
Expand All @@ -50,6 +52,7 @@ impl<C: Blockchain> IndexingInputs<C> {
poi_version,
network,
instrument,
subgraph_store,
} = self;
IndexingInputs {
deployment: deployment.clone(),
Expand All @@ -67,6 +70,7 @@ impl<C: Blockchain> IndexingInputs<C> {
poi_version: *poi_version,
network: network.clone(),
instrument: *instrument,
subgraph_store: subgraph_store.cheap_clone(),
}
}
}
1 change: 1 addition & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
poi_version,
network,
instrument,
subgraph_store: self.subgraph_store.cheap_clone(),
};

// Initialize the indexing context, including both static and dynamic data sources.
Expand Down
20 changes: 12 additions & 8 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ where
self.ctx.filter = Some(self.build_filter());

if self.inputs.deployment.hash.to_string()
== "QmagGaBm7FL9uQWg1bk52Eb3LTN4owkvxEKkirtyXNLQc9"
== "QmcmBvMt1hbPTtPWaBk7HXwXx71tzAKfa2eZeyV2mpRBLQ"
{
self.populate_dataset();
self.populate_dataset().await;
panic!("asdasd");
}

Expand Down Expand Up @@ -291,18 +291,22 @@ where
// let db = Arc::new(sled::open(DB_NAME).map_err(SledStoreError::from).unwrap());
// self.inputs.store

let store = Arc::new(PostgresIndexerDB::new(
self.inputs.store.cheap_clone(),
self.inputs.deployment.cheap_clone(),
self.logger.cheap_clone(),
self.metrics.subgraph.stopwatch,
));
let store = Arc::new(
PostgresIndexerDB::new(
self.inputs.subgraph_store.cheap_clone(),
self.inputs.deployment.cheap_clone(),
self.logger.cheap_clone(),
self.metrics.subgraph.stopwatch.cheap_clone(),
)
.await,
);

let ctx = Arc::new(IndexerContext {
chain: self.inputs.chain.clone(),
transform: Arc::new(UniswapTransform::new()),
store,
deployment: self.inputs.deployment.clone(),
logger: self.logger.cheap_clone(),
});

let iw = IndexWorker {};
Expand Down
9 changes: 8 additions & 1 deletion graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ impl DeploymentId {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
pub struct SubgraphSegmentId(pub i32);

/// A segment refers to a deployment block range. It is used to limit the scope of the store
Expand All @@ -889,6 +889,13 @@ impl Default for SubgraphSegment {
}

impl SubgraphSegment {
pub fn id(&self) -> Option<SubgraphSegmentId> {
match self {
SubgraphSegment::AllBlocks => None,
SubgraphSegment::Range(details) => Some(details.id),
}
}

pub fn details(&self) -> Option<&SegmentDetails> {
match self {
SubgraphSegment::AllBlocks => None,
Expand Down
41 changes: 37 additions & 4 deletions graph/src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::time::Duration;
use std::{collections::HashMap, pin::Pin, sync::Arc};

use crate::blockchain::block_stream::BlockStreamError;
use crate::blockchain::BlockPtr;
use crate::components::store::{
DeploymentId, SegmentDetails, SubgraphSegment, SubgraphSegmentId, WritableStore,
};
use crate::util::backoff::ExponentialBackoff;
use crate::util::futures::retry;
use crate::{
blockchain::{
block_stream::{BlockStreamEvent, FirehoseCursor},
Expand All @@ -20,6 +23,7 @@ use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use borsh::{BorshDeserialize, BorshSerialize};
use futures03::{Stream, StreamExt};
use slog::Logger;
use tokio::{sync::mpsc, time::Instant};

pub mod block_stream;
Expand Down Expand Up @@ -147,6 +151,7 @@ pub struct IndexerContext<B: Blockchain, T: BlockTransform, S: IndexerStore> {
pub transform: Arc<T>,
pub store: Arc<S>,
pub deployment: DeploymentLocator,
pub logger: Logger,
}

impl<B: Blockchain, T: BlockTransform, S: IndexerStore> IndexerContext<B, T, S> {}
Expand Down Expand Up @@ -248,10 +253,17 @@ impl IndexWorker {
S: IndexerStore + 'static,
{
let chain_store = ctx.chain.chain_store();
let chain_head = chain_store
.chain_head_ptr()
.await?
.ok_or(anyhow!("Expected chain head to exist"))?;
let chain_head = forever_async(&ctx.logger, "get chain head", || {
let chain_store = chain_store.cheap_clone();
async move {
chain_store
.chain_head_ptr()
.await
.and_then(|r| r.ok_or(anyhow!("Expected chain head to exist")))
}
})
.await?;

let chain_head = chain_head.block_number() - ENV_VARS.reorg_threshold;
let stop_block = match stop_block {
Some(stop_block) => stop_block.min(chain_head),
Expand Down Expand Up @@ -448,3 +460,24 @@ mod tests {
}
}
}

// TODO: Re-use something
const BACKOFF_BASE: Duration = Duration::from_millis(100);
const BACKOFF_CEIL: Duration = Duration::from_secs(10);

async fn forever_async<T, F, Fut>(logger: &Logger, op: &str, f: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut backoff = ExponentialBackoff::new(BACKOFF_BASE, BACKOFF_CEIL);
loop {
match f().await {
Ok(v) => return Ok(v),
Err(e) => {
slog::error!(&logger, "Failed to {}, retrying...\nerror: {:?}", op, e)
}
}
backoff.sleep_async().await;
}
}
18 changes: 9 additions & 9 deletions graph/src/indexer/store/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use graphql_parser::schema::SchemaDefinition;
use slog::Logger;

use crate::anyhow::Result;
Expand All @@ -12,7 +11,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::store::write::EntityModification;
use crate::components::store::{
DeploymentLocator, Entity, SegmentDetails, SubgraphSegment, SubgraphSegmentId, SubgraphStore,
DeploymentLocator, SegmentDetails, SubgraphSegment, SubgraphSegmentId, SubgraphStore,
WritableStore,
};
use crate::data::store::scalar::Bytes;
Expand All @@ -21,15 +20,15 @@ use crate::data::subgraph::LATEST_VERSION;
use crate::data::value::Word;
use crate::indexer::{BlockSender, EncodedTriggers, State};
use crate::prelude::Value;
use crate::schema::{EntityKey, InputSchema, Schema};
use crate::schema::InputSchema;
use crate::{components::store::BlockNumber, indexer::IndexerStore};

const SCHEMA: &str = "
type Triggers @entity(immutable: true) {
id ID!
number Bytes!
hash Bytes!
data Bytes!
type Trigger @entity(immutable: true) {
id: ID!
number: Int!
hash: Bytes!
data: Bytes!
}";

pub struct PostgresIndexerDB {
Expand Down Expand Up @@ -125,11 +124,12 @@ impl IndexerStore for PostgresIndexerDB {
.writable(
self.logger.cheap_clone(),
self.deployment.id,
SubgraphSegment::AllBlocks,
s.clone(),
Arc::new(vec![]),
)
.await?;
let data: HashMap<Word, Value> = HashMap::from_iter(vec![
(Word::from("id"), Value::Int8(bn.number as i64)),
(
Word::from("hash"),
Value::Bytes(Bytes::from(bn.hash.0.as_ref())),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-- Your SQL goes here
create table subgraphs.segments(
create table subgraphs.subgraph_segments(
id serial primary key,
deployment int4 references subgraph_deployment(id) on delete cascade,
start_block int4 not null
end_block int4 not null
current_block int4 null,
deployment int4 references subgraphs.subgraph_deployment(id) on delete cascade,
start_block int4 not null,
stop_block int4 not null,
current_block int4 null
);
8 changes: 4 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub fn transact_block(
let number = format!("{}::numeric", ptr.number);

let rows = update(
s::table.filter(s::id.eq(site.id)).filter(
s::table.filter(s::id.eq(segment.id.0)).filter(
// Asserts that the processing direction is forward.
s::current_block
.lt(sql(&number))
Expand Down Expand Up @@ -1243,7 +1243,7 @@ pub fn create_subgraph_segments(
.values((
s::deployment.eq(&deployment.0),
s::start_block.eq(&start_block),
s::end_block.eq(&end_block),
s::stop_block.eq(&end_block),
))
.on_conflict_do_nothing()
.execute(conn)
Expand All @@ -1257,7 +1257,7 @@ pub fn create_subgraph_segments(
s::id,
s::deployment,
s::start_block,
s::end_block,
s::stop_block,
s::current_block.nullable(),
))
.filter(s::deployment.eq(&deployment.0))
Expand All @@ -1283,7 +1283,7 @@ pub fn subgraph_segments(
s::id,
s::deployment,
s::start_block,
s::end_block,
s::stop_block,
s::current_block.nullable(),
))
.filter(s::deployment.eq(&deployment))
Expand Down
15 changes: 13 additions & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,13 +1111,24 @@ impl DeploymentStore {
deployment: graph::components::store::DeploymentId,
segments: Vec<SegmentDetails>,
) -> Result<Vec<SubgraphSegment>, StoreError> {
unimplemented!()
self.with_conn(move |conn, _| {
deployment::create_subgraph_segments(
conn,
deployment,
segments.into_iter().map(Into::into).collect(),
)
.map_err(CancelableError::from)
})
.await
}
pub(crate) async fn subgraph_segments(
&self,
deployment: graph::components::store::DeploymentId,
) -> Result<Vec<SubgraphSegment>, StoreError> {
unimplemented!()
self.with_conn(move |conn, _| {
deployment::subgraph_segments(conn, deployment.into()).map_err(CancelableError::from)
})
.await
}

pub(crate) fn transact_block_operations(
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ table! {
id -> Integer,
deployment -> Integer,
start_block -> Integer,
end_block -> Integer,
stop_block -> Integer,
current_block -> Nullable<Integer>,
}
}
Expand Down
27 changes: 18 additions & 9 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graph::{
server::index_node::VersionInfo,
store::{
self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait,
PruneReporter, PruneRequest, SubgraphFork, SubgraphSegment,
PruneReporter, PruneRequest, SubgraphFork, SubgraphSegment, SubgraphSegmentId,
},
},
constraint_violation,
Expand Down Expand Up @@ -282,7 +282,7 @@ pub struct SubgraphStoreInner {
sites: TimedCache<DeploymentHash, Site>,
placer: Arc<dyn DeploymentPlacer + Send + Sync + 'static>,
sender: Arc<NotificationSender>,
writables: Mutex<HashMap<(DeploymentId, Segment), Arc<WritableStore>>>,
writables: Mutex<HashMap<(DeploymentId, SubgraphSegmentId), Arc<WritableStore>>>,
registry: Arc<MetricsRegistry>,
}

Expand Down Expand Up @@ -1441,7 +1441,12 @@ impl SubgraphStoreTrait for SubgraphStore {
// We cache writables to make sure calls to this method are
// idempotent and there is ever only one `WritableStore` for any
// deployment
if let Some(writable) = self.writables.lock().unwrap().get(&deployment) {
if let Some(writable) = self
.writables
.lock()
.unwrap()
.get(&(deployment, segment.id().unwrap_or_default()))
{
// A poisoned writable will not write anything anymore; we
// discard it and create a new one that is properly initialized
// according to the state in the database.
Expand All @@ -1463,16 +1468,16 @@ impl SubgraphStoreTrait for SubgraphStore {
self.as_ref().clone(),
logger,
site,
segment,
segment.clone(),
manifest_idx_and_name,
self.registry.clone(),
)
.await?,
);
self.writables
.lock()
.unwrap()
.insert(deployment, writable.cheap_clone());
self.writables.lock().unwrap().insert(
(deployment, segment.id().unwrap_or_default()),
writable.cheap_clone(),
);
Ok(writable)
}

Expand All @@ -1481,7 +1486,11 @@ impl SubgraphStoreTrait for SubgraphStore {

// Remove the writable from the cache and stop it
let deployment = loc.id.into();
let writable = self.writables.lock().unwrap().remove(&deployment);
let writable = self
.writables
.lock()
.unwrap()
.remove(&(deployment, SubgraphSegmentId::default()));
match writable {
Some(writable) => writable.stop().await,
None => Ok(()),
Expand Down

0 comments on commit 1e0ded0

Please sign in to comment.