Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions crates/bin/dump-check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ pub async fn dump_check(
) -> Result<(), BoxError> {
let dataset_version = dataset_version.map(|v| v.parse()).transpose()?;
let dataset = dataset_store
.load_dataset(&dataset_name, dataset_version.as_ref())
.await?;
.get_dataset(&dataset_name, dataset_version.as_ref())
.await?
.ok_or_else(|| format!("Dataset '{}' not found", dataset_name))?;
let client = dataset_store
.load_client(&dataset_name, false, metrics.as_ref().map(|m| &m.meter))
.await?;
.get_client(
&dataset_name,
dataset_version.as_ref(),
false,
metrics.as_ref().map(|m| &m.meter),
)
.await?
.ok_or_else(|| format!("Client for dataset '{}' not found", dataset_name))?;
let total_blocks = end_block - start + 1;
let mut tables: Vec<Arc<PhysicalTable>> = Vec::with_capacity(dataset.tables.len());
let dataset_version = match dataset.kind.as_str() {
Expand Down
46 changes: 28 additions & 18 deletions crates/bin/nozzle/src/dump_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use common::{
use datafusion::sql::resolve::resolve_table_references;
use dataset_store::DatasetStore;
use datasets_common::version::Version;
use datasets_derived::{
DATASET_KIND as DERIVED_DATASET_KIND, sql_dataset::DATASET_KIND as SQL_DATASET_KIND,
};
use metadata_db::MetadataDb;
use monitoring::telemetry;
use static_assertions::const_assert;
use tracing::{info, warn};

pub async fn dump(
config: Arc<Config>,
Expand Down Expand Up @@ -51,18 +53,19 @@ pub async fn dump(
}

let dump_order: Vec<&str> = datasets.iter().map(|d| d.as_str()).collect();
info!("dump order: {}", dump_order.join(", "));
tracing::info!("dump order: {}", dump_order.join(", "));

let mut physical_datasets = vec![];
for dataset_name in datasets {
let (dataset_name, version) =
if let Some((name, version_str)) = dataset_name.split_once("__") {
match Version::from_version_identifier(version_str) {
Ok(v) => (name, Some(v)),
Err(e) => {
warn!(
Err(err) => {
tracing::warn!(
"Skipping dataset {} due to invalid version: {}",
dataset_name, e
dataset_name,
err
);
continue;
}
Expand All @@ -71,15 +74,17 @@ pub async fn dump(
(dataset_name.as_str(), None)
};
let dataset = dataset_store
.load_dataset(&dataset_name, version.as_ref())
.await?;
.get_dataset(&dataset_name, version.as_ref())
.await?
.ok_or_else(|| format!("Dataset '{}' not found", dataset_name))?;
let mut tables = Vec::with_capacity(dataset.tables.len());

if matches!(dataset.kind.as_str(), "sql" | "manifest") {
let table_names: Vec<&str> = dataset.tables.iter().map(|t| t.name()).collect();
info!(
tracing::info!(
"Table dump order for dataset {}: {:?}",
dataset_name, table_names
dataset_name,
table_names
);
}

Expand Down Expand Up @@ -166,21 +171,26 @@ pub async fn datasets_and_dependencies(
) -> Result<Vec<String>, BoxError> {
let mut deps: BTreeMap<String, Vec<String>> = Default::default();
while !datasets.is_empty() {
let dataset = store.load_dataset(&datasets.pop().unwrap(), None).await?;
let dataset_name = datasets.pop().unwrap();
let Some(dataset) = store.get_dataset(&dataset_name, None).await? else {
return Err(format!("Dataset '{}' not found", dataset_name).into());
};

let sql_dataset = match dataset.kind.as_str() {
datasets_derived::sql_dataset::DATASET_KIND => {
store.load_sql_dataset(&dataset.name).await?
}
datasets_derived::DATASET_KIND => {
store
.load_manifest_dataset(&dataset.name, dataset.version.as_ref().unwrap())
.await?
}
SQL_DATASET_KIND => store
.get_sql_dataset(&dataset.name, dataset.version.as_ref())
.await?
.ok_or_else(|| format!("SQL dataset '{}' not found", dataset.name))?,
DERIVED_DATASET_KIND => store
.get_sql_dataset(&dataset.name, dataset.version.as_ref())
.await?
.ok_or_else(|| format!("Derived dataset '{}' not found", dataset.name))?,
_ => {
deps.insert(dataset.name.to_string(), vec![]);
continue;
}
};

let mut refs: Vec<String> = Default::default();
for query in sql_dataset.queries.values() {
let (tables, _) = resolve_table_references(query, true)?;
Expand Down
1 change: 1 addition & 0 deletions crates/core/dataset-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde.workspace = true
serde_json.workspace = true
substreams-datasets = { path = "../../extractors/substreams" }
thiserror.workspace = true
tokio.workspace = true
toml = { workspace = true, features = ["preserve_order"] }
tracing.workspace = true
url.workspace = true
Expand Down
68 changes: 68 additions & 0 deletions crates/core/dataset-store/src/block_stream_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use async_stream::stream;
use common::{BlockNum, BlockStreamer, BoxError, RawDatasetRows};
use futures::Stream;

#[derive(Clone)]
pub(crate) enum BlockStreamClient {
EvmRpc(evm_rpc_datasets::JsonRpcClient),
EthBeacon(eth_beacon_datasets::BeaconClient),
Firehose(firehose_datasets::Client),
Substreams(substreams_datasets::Client),
}

impl BlockStreamer for BlockStreamClient {
async fn block_stream(
self,
start_block: BlockNum,
end_block: BlockNum,
) -> impl Stream<Item = Result<RawDatasetRows, BoxError>> + Send {
// Each client returns a different concrete stream type, so we
// use `stream!` to unify them into a wrapper stream
stream! {
match self {
Self::EvmRpc(client) => {
let stream = client.block_stream(start_block, end_block).await;
for await item in stream {
yield item;
}
}
Self::EthBeacon(client) => {
let stream = client.block_stream(start_block, end_block).await;
for await item in stream {
yield item;
}
}
Self::Firehose(client) => {
let stream = client.block_stream(start_block, end_block).await;
for await item in stream {
yield item;
}
}
Self::Substreams(client) => {
let stream = client.block_stream(start_block, end_block).await;
for await item in stream {
yield item;
}
}
}
}
}

async fn latest_block(&mut self) -> Result<BlockNum, BoxError> {
match self {
Self::EvmRpc(client) => client.latest_block().await,
Self::EthBeacon(client) => client.latest_block().await,
Self::Firehose(client) => client.latest_block().await,
Self::Substreams(client) => client.latest_block().await,
}
}

fn provider_name(&self) -> &str {
match self {
Self::EvmRpc(client) => client.provider_name(),
Self::EthBeacon(client) => client.provider_name(),
Self::Firehose(client) => client.provider_name(),
Self::Substreams(client) => client.provider_name(),
}
}
}
Loading
Loading