Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/core/common/src/catalog/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::{
};
use datasets_common::{
deps::{alias::DepAlias, reference::DepReference},
hash_reference::HashReference,
partial_reference::PartialReference,
table_name::TableName,
};
Expand All @@ -21,7 +22,7 @@ use crate::{BlockNum, SPECIAL_BLOCK_NUM, js_udf::JsUdf, sql::TableReference};
/// Identifies a dataset and its data schema.
#[derive(Clone, Debug)]
pub struct Dataset {
pub manifest_hash: datasets_common::hash::Hash,
pub reference: HashReference,
pub dependencies: BTreeMap<DepAlias, DepReference>,
pub kind: String,
pub network: Option<String>,
Expand Down Expand Up @@ -76,8 +77,8 @@ impl Dataset {
})
}

pub fn manifest_hash(&self) -> &datasets_common::hash::Hash {
&self.manifest_hash
pub fn reference(&self) -> &HashReference {
&self.reference
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl PhysicalTable {
store: DataStore,
table: ResolvedTable,
) -> Result<Option<Self>, BoxError> {
let manifest_hash = table.dataset().manifest_hash();
let manifest_hash = table.dataset().reference().hash();
let table_name = table.name();

let Some(db_row) = store
Expand Down
4 changes: 2 additions & 2 deletions crates/core/dataset-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ pub enum EthCallForDatasetError {
///
/// This occurs when an EVM RPC dataset definition does not include the network
/// field, which is required to determine the appropriate provider configuration.
#[error("Dataset '{manifest_hash}' is missing required 'network' field for EvmRpc kind")]
MissingNetwork { manifest_hash: String },
#[error("Dataset '{reference}' is missing required 'network' field for EvmRpc kind")]
MissingNetwork { reference: HashReference },

/// No provider configuration found for the dataset kind and network combination.
///
Expand Down
47 changes: 20 additions & 27 deletions crates/core/dataset-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ pub struct DatasetStore {
provider_configs_store: ProviderConfigsStore,
// Store for dataset definitions (manifests).
dataset_manifests_store: DatasetManifestsStore,
// Cache maps dataset name to eth_call UDF.
eth_call_cache: Arc<RwLock<HashMap<String, ScalarUDF>>>,
// This cache maps dataset name to the dataset definition.
dataset_cache: Arc<RwLock<HashMap<Hash, Arc<Dataset>>>>,
// Cache maps HashReference to eth_call UDF.
eth_call_cache: Arc<RwLock<HashMap<HashReference, ScalarUDF>>>,
// This cache maps HashReference to the dataset definition.
dataset_cache: Arc<RwLock<HashMap<HashReference, Arc<Dataset>>>>,
}

impl DatasetStore {
Expand Down Expand Up @@ -649,23 +649,19 @@ impl DatasetStore {
&self,
reference: &HashReference,
) -> Result<Arc<Dataset>, GetDatasetError> {
let namespace = reference.namespace();
let name = reference.name();
let hash = reference.hash();

// Check cache using manifest hash as the key
if let Some(dataset) = self.dataset_cache.read().get(hash).cloned() {
tracing::trace!(manifest_hash = %hash, "Cache hit, returning cached dataset");
// Check cache using HashReference as the key
if let Some(dataset) = self.dataset_cache.read().get(reference).cloned() {
tracing::trace!(dataset = %reference.short_display(), "Cache hit, returning cached dataset");
tracing::debug!(
dataset_namespace = %namespace,
dataset_name = %name,
manifest_hash = %hash,
dataset = %reference.short_display(),
"Dataset loaded successfully"
);
return Ok(dataset);
}

tracing::debug!(manifest_hash = %hash, "Cache miss, loading from store");
tracing::debug!(dataset = %reference.short_display(), "Cache miss, loading from store");

// Get the manifest path from metadata database
let Some(path) = metadata_db::manifests::get_path(&self.metadata_db, hash)
Expand Down Expand Up @@ -715,7 +711,7 @@ impl DatasetStore {
kind,
source,
})?;
evm_rpc_datasets::dataset(hash.clone(), manifest)
evm_rpc_datasets::dataset(reference.clone(), manifest)
}
DatasetKind::Solana => {
let manifest = manifest_content
Expand All @@ -725,7 +721,7 @@ impl DatasetStore {
kind,
source,
})?;
solana_datasets::dataset(hash.clone(), manifest)
solana_datasets::dataset(reference.clone(), manifest)
}
DatasetKind::EthBeacon => {
let manifest = manifest_content
Expand All @@ -735,7 +731,7 @@ impl DatasetStore {
kind,
source,
})?;
eth_beacon_datasets::dataset(hash.clone(), manifest)
eth_beacon_datasets::dataset(reference.clone(), manifest)
}
DatasetKind::Firehose => {
let manifest = manifest_content
Expand All @@ -745,7 +741,7 @@ impl DatasetStore {
kind,
source,
})?;
firehose_datasets::evm::dataset(hash.clone(), manifest)
firehose_datasets::evm::dataset(reference.clone(), manifest)
}
DatasetKind::Derived => {
let manifest = manifest_content
Expand All @@ -755,7 +751,7 @@ impl DatasetStore {
kind,
source,
})?;
datasets_derived::dataset(hash.clone(), manifest).map_err(|source| {
datasets_derived::dataset(reference.clone(), manifest).map_err(|source| {
GetDatasetError::CreateDerivedDataset {
reference: reference.clone(),
source,
Expand All @@ -769,12 +765,10 @@ impl DatasetStore {
// Cache the dataset
self.dataset_cache
.write()
.insert(hash.clone(), dataset.clone());
.insert(reference.clone(), dataset.clone());

tracing::debug!(
dataset_namespace = %namespace,
dataset_name = %name,
manifest_hash = %hash,
dataset = %reference.short_display(),
"Dataset loaded successfully"
);

Expand Down Expand Up @@ -1007,19 +1001,18 @@ impl DatasetStore {
}

// Check if we already have the provider cached.
let cache_key = dataset.manifest_hash().as_str();
if let Some(udf) = self.eth_call_cache.read().get(cache_key) {
if let Some(udf) = self.eth_call_cache.read().get(dataset.reference()) {
return Ok(Some(udf.clone()));
}

// Load the provider from the dataset definition.
let Some(network) = &dataset.network else {
tracing::warn!(
manifest_hash = %dataset.manifest_hash(),
dataset = %dataset.reference().short_display(),
"dataset is missing required 'network' field for evm-rpc kind"
);
return Err(EthCallForDatasetError::MissingNetwork {
manifest_hash: dataset.manifest_hash().to_string(),
reference: dataset.reference().clone(),
});
};

Expand Down Expand Up @@ -1064,7 +1057,7 @@ impl DatasetStore {
// Cache the EthCall UDF
self.eth_call_cache
.write()
.insert(cache_key.to_string(), udf.clone());
.insert(dataset.reference().clone(), udf.clone());

Ok(Some(udf))
}
Expand Down
7 changes: 3 additions & 4 deletions crates/core/datasets-derived/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use common::{
use datafusion::sql::parser;
use datasets_common::{
deps::alias::{DepAlias, DepAliasError, DepAliasOrSelfRef, DepAliasOrSelfRefError},
hash::Hash,
hash_reference::HashReference,
table_name::TableName,
};
Expand All @@ -47,8 +46,8 @@ use crate::{
///
/// This function transforms a derived dataset manifest with its tables, functions, and metadata
/// into the internal `Dataset` structure used by the query engine. Dataset identity (namespace,
/// name, version, manifest_hash) must be provided externally as they are not part of the manifest.
pub fn dataset(manifest_hash: Hash, manifest: Manifest) -> Result<Dataset, DatasetError> {
/// name, version, hash reference) must be provided externally as they are not part of the manifest.
pub fn dataset(reference: HashReference, manifest: Manifest) -> Result<Dataset, DatasetError> {
let queries = {
let mut queries = BTreeMap::new();
for (table_name, table) in &manifest.tables {
Expand Down Expand Up @@ -89,7 +88,7 @@ pub fn dataset(manifest_hash: Hash, manifest: Manifest) -> Result<Dataset, Datas
.collect();

Ok(Dataset {
manifest_hash,
reference,
dependencies: manifest.dependencies,
kind: DerivedDatasetKind.to_string(),
network: None,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/dump/src/streaming_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl StreamingQuery {
let network = tables.iter().map(|t| t.network()).next().unwrap();
let src_datasets = tables
.iter()
.map(|t| (t.dataset().manifest_hash().clone(), t.dataset().clone()))
.map(|t| (t.dataset().reference().hash().clone(), t.dataset().clone()))
.collect();
let blocks_table =
resolve_blocks_table(dataset_store, data_store.clone(), src_datasets, network).await?;
Expand Down Expand Up @@ -713,7 +713,7 @@ async fn resolve_blocks_table(

// TODO: Have a dataset name here that is not made up.
let dataset_name = Name::try_from("blocks_table".to_string())?;
let manifest_hash = dataset.manifest_hash().clone();
let manifest_hash = dataset.reference().hash().clone();
let reference = PartialReference::new(
None,
dataset_name,
Expand Down Expand Up @@ -750,7 +750,7 @@ async fn search_dependencies_for_raw_dataset(
let mut visited = BTreeSet::new();

while let Some(dataset) = queue.pop_front() {
let hash = dataset.manifest_hash().clone();
let hash = dataset.reference().hash().clone();

// Skip duplicates
if !visited.insert(hash) {
Expand Down
7 changes: 4 additions & 3 deletions crates/extractors/eth-beacon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, num::NonZeroU32};

use common::{BlockNum, Dataset};
use datasets_common::hash_reference::HashReference;
use reqwest::Url;

mod block;
Expand Down Expand Up @@ -65,12 +66,12 @@ pub struct ProviderConfig {

/// Convert an Eth Beacon manifest into a logical dataset representation.
///
/// Dataset identity (namespace, name, version, manifest_hash) must be provided externally as they
/// Dataset identity (namespace, name, version, hash reference) must be provided externally as they
/// are not part of the manifest.
pub fn dataset(manifest_hash: datasets_common::hash::Hash, manifest: Manifest) -> Dataset {
pub fn dataset(reference: HashReference, manifest: Manifest) -> Dataset {
let network = manifest.network;
Dataset {
manifest_hash,
reference,
dependencies: BTreeMap::new(),
kind: manifest.kind.to_string(),
start_block: Some(manifest.start_block),
Expand Down
7 changes: 4 additions & 3 deletions crates/extractors/evm-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf};

use common::{BlockNum, BoxError, Dataset};
use datasets_common::hash_reference::HashReference;
use serde_with::serde_as;
use url::Url;

Expand Down Expand Up @@ -80,12 +81,12 @@ pub struct ProviderConfig {

/// Convert an EVM RPC manifest into a logical dataset representation.
///
/// Dataset identity (namespace, name, version, manifest_hash) must be provided externally as they
/// Dataset identity (namespace, name, version, hash reference) must be provided externally as they
/// are not part of the manifest.
pub fn dataset(manifest_hash: datasets_common::hash::Hash, manifest: Manifest) -> Dataset {
pub fn dataset(reference: HashReference, manifest: Manifest) -> Dataset {
let network = manifest.network;
Dataset {
manifest_hash,
reference,
dependencies: BTreeMap::new(),
kind: manifest.kind.to_string(),
start_block: Some(manifest.start_block),
Expand Down
7 changes: 4 additions & 3 deletions crates/extractors/firehose/src/evm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;

use common::Dataset;
use datasets_common::hash_reference::HashReference;

use crate::dataset::Manifest;
pub use crate::proto::sf::ethereum::r#type::v2 as pbethereum;
Expand All @@ -9,12 +10,12 @@ pub mod tables;

/// Convert a Firehose manifest into a logical dataset representation.
///
/// Dataset identity (namespace, name, version, manifest_hash) must be provided externally as they
/// Dataset identity (namespace, name, version, hash reference) must be provided externally as they
/// are not part of the manifest.
pub fn dataset(manifest_hash: datasets_common::hash::Hash, manifest: Manifest) -> Dataset {
pub fn dataset(reference: HashReference, manifest: Manifest) -> Dataset {
let network = manifest.network;
Dataset {
manifest_hash,
reference,
dependencies: BTreeMap::new(),
kind: manifest.kind.to_string(),
start_block: Some(manifest.start_block),
Expand Down
10 changes: 5 additions & 5 deletions crates/extractors/solana/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf};

use common::{BlockNum, BoxError, Dataset};
use datasets_common::manifest::TableSchema;
use datasets_common::{hash_reference::HashReference, manifest::TableSchema};
use serde_with::serde_as;
use url::Url;

Expand Down Expand Up @@ -84,11 +84,11 @@ pub struct ProviderConfig {

/// Convert a Solana manifest into a logical dataset representation.
///
/// Dataset identity (namespace, name, version) must be provided externally as they are not part
/// of the manifest.
pub fn dataset(manifest_hash: datasets_common::hash::Hash, manifest: Manifest) -> Dataset {
/// Dataset identity (namespace, name, version, hash reference) must be provided externally as they
/// are not part of the manifest.
pub fn dataset(reference: HashReference, manifest: Manifest) -> Dataset {
Dataset {
manifest_hash,
reference,
dependencies: BTreeMap::new(),
kind: manifest.kind.to_string(),
start_block: Some(manifest.start_block),
Expand Down