From 116e8c95d9e1847d3a67a12cb2a2b62cc9ff7e9a Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 23 Feb 2026 08:47:39 -0700 Subject: [PATCH 1/2] fix(datasets-derived): remove start_block from derived dataset manifests Derived datasets now inherit start_block from their dependencies at runtime rather than specifying it in the manifest. This is a prerequisite for multi-network dataset materialization, since that is incompatible with a single start_block configuration. --- crates/core/common/src/datasets_derived.rs | 1 - crates/core/datasets-derived/src/dataset.rs | 5 +- crates/core/datasets-derived/src/manifest.rs | 6 +- .../worker-datasets-derived/src/dataset.rs | 36 +-------- .../services/admin-api/src/handlers/common.rs | 36 --------- docs/manifest-schemas/derived.spec.json | 10 --- .../tests/it_admin_api_datasets_register.rs | 79 ++----------------- 7 files changed, 9 insertions(+), 164 deletions(-) diff --git a/crates/core/common/src/datasets_derived.rs b/crates/core/common/src/datasets_derived.rs index e6cb33543..8e3e2fc94 100644 --- a/crates/core/common/src/datasets_derived.rs +++ b/crates/core/common/src/datasets_derived.rs @@ -74,7 +74,6 @@ pub fn dataset(reference: HashReference, manifest: Manifest) -> Result, - pub(crate) start_block: Option, pub(crate) kind: DerivedDatasetKind, pub(crate) dependencies: BTreeMap, pub(crate) functions: Vec, @@ -32,7 +31,6 @@ impl Dataset { reference: HashReference, dependencies: BTreeMap, kind: DerivedDatasetKind, - start_block: Option, finalized_blocks_only: bool, tables: Vec, functions: Vec, @@ -41,7 +39,6 @@ impl Dataset { reference, dependencies, kind, - start_block, finalized_blocks_only, tables, functions, @@ -97,7 +94,7 @@ impl datasets_common::dataset::Dataset for Dataset { } fn start_block(&self) -> Option { - self.start_block + None } fn finalized_blocks_only(&self) -> bool { diff --git a/crates/core/datasets-derived/src/manifest.rs b/crates/core/datasets-derived/src/manifest.rs index f55df6011..57571e44f 100644 --- a/crates/core/datasets-derived/src/manifest.rs +++ b/crates/core/datasets-derived/src/manifest.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; // Re-export schema types from datasets-common pub use datasets_common::manifest::{ArrowSchema, Field, Function, FunctionSource, TableSchema}; -use datasets_common::{block_num::BlockNum, network_id::NetworkId, table_name::TableName}; +use datasets_common::{network_id::NetworkId, table_name::TableName}; use crate::{ dataset_kind::DerivedDatasetKind, @@ -29,10 +29,6 @@ pub struct Manifest { /// Dataset kind, must be `manifest` pub kind: DerivedDatasetKind, - /// Dataset start block - #[serde(default)] - pub start_block: Option, - /// External dataset dependencies with version requirements #[serde(default)] pub dependencies: BTreeMap, diff --git a/crates/core/worker-datasets-derived/src/dataset.rs b/crates/core/worker-datasets-derived/src/dataset.rs index 077262504..9e6d46ebd 100644 --- a/crates/core/worker-datasets-derived/src/dataset.rs +++ b/crates/core/worker-datasets-derived/src/dataset.rs @@ -403,17 +403,6 @@ pub enum Error { /// the beginning. #[error("Parallel table dump tasks failed")] ParallelTasksFailed(#[source] TryWaitAllError), - - /// Start block before dependencies - /// - /// This occurs when the start block of the derived dataset is before the earliest block of the dependencies. - #[error( - "derived dataset start_block ({dataset_start_block}) is before dependency's earliest available block ({dependency_earliest_block})" - )] - StartBlockBeforeDependencies { - dataset_start_block: BlockNum, - dependency_earliest_block: BlockNum, - }, } impl Error { @@ -505,7 +494,6 @@ async fn dump_table( .map_err(DumpTableError::CreatePhysicalCatalog)? }; let planning_ctx = PlanContext::new(env.session_config.clone(), catalog.logical().clone()); - let manifest_start_block = manifest.start_block; join_set.spawn( async move { @@ -530,16 +518,8 @@ async fn dump_table( return Ok::<(), DumpTableSpawnError>(()); }; - // If derived dataset has a start_block, validate it - if let Some(dataset_start_block) = &manifest_start_block - && dataset_start_block < &dependency_earliest_block - { - return Err(DumpTableSpawnError::StartBlockBeforeDependencies { - dataset_start_block: *dataset_start_block, - dependency_earliest_block, - }); - } - let start = manifest_start_block.unwrap_or(dependency_earliest_block); + // Derived datasets inherit start_block from their dependencies + let start = dependency_earliest_block; let resolved = resolve_end_block(&end, start, async { let query_ctx = @@ -750,18 +730,6 @@ pub enum DumpTableSpawnError { #[error("failed to get earliest block: {0}")] EarliestBlock(#[source] EarliestBlockError), - /// Dataset start block is before the earliest dependency block - /// - /// This occurs when the manifest specifies a start block that is - /// earlier than the earliest available data in the dependencies. - #[error( - "earliest block is before start block: {dataset_start_block} < {dependency_earliest_block}" - )] - StartBlockBeforeDependencies { - dataset_start_block: BlockNum, - dependency_earliest_block: BlockNum, - }, - /// Failed to resolve the end block for dumping /// /// This occurs when determining the target end block for the diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index fdb52733a..3de786b1e 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -5,7 +5,6 @@ use std::collections::BTreeMap; use amp_data_store::{DataStore, PhyTableRevision}; use amp_datasets_registry::error::ResolveRevisionError; use common::{ - BlockNum, catalog::logical::for_manifest_validation::{ self as catalog, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError, TableReferencesMap, @@ -257,27 +256,6 @@ pub async fn validate_derived_manifest( dependencies.insert(alias.clone(), reference); } - // Check if the start block is before the earliest available block of the dependencies - if let Some(dataset_start_block) = &manifest.start_block { - for (alias, dataset_ref) in &dependencies { - let dataset = store.get_dataset(dataset_ref).await.map_err(|err| { - ManifestValidationError::FetchDependencyDataset { - alias: alias.to_string(), - source: err, - } - })?; - - if let Some(dep_start_block) = dataset.start_block() - && *dataset_start_block < dep_start_block - { - return Err(ManifestValidationError::StartBlockBeforeDependencies { - dataset_start_block: *dataset_start_block, - dependency_earliest_block: dep_start_block, - }); - } - } - } - // Step 2: Parse all SQL queries and extract references // Store parsed statements to avoid re-parsing in Step 4 let mut statements: BTreeMap = BTreeMap::new(); @@ -589,20 +567,6 @@ pub enum ManifestValidationError { /// Failed to create DataFusion session configuration #[error("failed to create session config")] SessionConfig(#[source] datafusion::error::DataFusionError), - - /// Start block before dependencies - /// - /// This occurs when the start block of the derived dataset is before - /// the earliest block of one of the dependencies. - #[error( - "derived dataset start_block ({dataset_start_block}) is before dependency's earliest available block ({dependency_earliest_block})" - )] - StartBlockBeforeDependencies { - /// The start block of the derived dataset - dataset_start_block: BlockNum, - /// The earliest available block of the dependency dataset - dependency_earliest_block: BlockNum, - }, } /// Registers all files in a revision with their Amp-specific metadata. diff --git a/docs/manifest-schemas/derived.spec.json b/docs/manifest-schemas/derived.spec.json index db0cbde16..be8fa8626 100644 --- a/docs/manifest-schemas/derived.spec.json +++ b/docs/manifest-schemas/derived.spec.json @@ -30,16 +30,6 @@ "description": "Dataset kind, must be `manifest`", "const": "manifest" }, - "start_block": { - "description": "Dataset start block", - "type": [ - "integer", - "null" - ], - "format": "uint64", - "default": null, - "minimum": 0 - }, "tables": { "description": "Table definitions mapped by table name", "type": "object", diff --git a/tests/src/tests/it_admin_api_datasets_register.rs b/tests/src/tests/it_admin_api_datasets_register.rs index 97d58d6dd..010480daa 100644 --- a/tests/src/tests/it_admin_api_datasets_register.rs +++ b/tests/src/tests/it_admin_api_datasets_register.rs @@ -165,85 +165,16 @@ async fn register_multiple_versions_of_same_dataset_succeeds() { } #[tokio::test] -async fn register_with_start_block_before_dependency_fails() { +async fn register_derived_dataset_succeeds() { //* Given - let ctx = TestCtx::setup("test_register_start_block_before_dep").await; + let ctx = TestCtx::setup("test_register_derived_dataset").await; let namespace = "_".parse::().expect("valid namespace"); - let name = "start_block_invalid" + let name = "derived_dataset" .parse::() .expect("valid dataset name"); let version = "1.0.0".parse::().expect("valid version"); - let mut manifest = create_test_manifest(); - manifest.start_block = Some(0); // eth_firehose starts at 15000000 - let manifest_str = - serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON"); - - //* When - let result = ctx - .register_dataset(&namespace, &name, &version, &manifest_str) - .await; - - //* Then - assert!( - result.is_err(), - "registration should fail when start_block is before dependency" - ); - let err = result.unwrap_err(); - match err { - RegisterError::ManifestValidationError(api_err) => { - assert_eq!(api_err.error_code, "MANIFEST_VALIDATION_ERROR"); - assert!( - api_err.error_message.contains("start_block") - && api_err.error_message.contains("before dependency"), - "Error should mention start_block and dependency: {}", - api_err.error_message - ); - } - _ => panic!("Expected ManifestValidationError, got: {:?}", err), - } -} - -#[tokio::test] -async fn register_with_valid_start_block_succeeds() { - //* Given - let ctx = TestCtx::setup("test_register_valid_start_block").await; - let namespace = "_".parse::().expect("valid namespace"); - let name = "start_block_valid" - .parse::() - .expect("valid dataset name"); - let version = "1.0.0".parse::().expect("valid version"); - - let mut manifest = create_test_manifest(); - manifest.start_block = Some(16_000_000); // After eth_firehose's 15000000 - let manifest_str = - serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON"); - - //* When - let result = ctx - .register_dataset(&namespace, &name, &version, &manifest_str) - .await; - - //* Then - assert!( - result.is_ok(), - "registration should succeed with valid start_block: {:?}", - result.err() - ); -} - -#[tokio::test] -async fn register_with_start_block_equal_to_dependency_succeeds() { - //* Given - let ctx = TestCtx::setup("test_register_start_block_equal").await; - let namespace = "_".parse::().expect("valid namespace"); - let name = "start_block_equal" - .parse::() - .expect("valid dataset name"); - let version = "1.0.0".parse::().expect("valid version"); - - let mut manifest = create_test_manifest(); - manifest.start_block = Some(15_000_000); // Equal to eth_firehose's start_block + let manifest = create_test_manifest(); let manifest_str = serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON"); @@ -255,7 +186,7 @@ async fn register_with_start_block_equal_to_dependency_succeeds() { //* Then assert!( result.is_ok(), - "registration should succeed with start_block equal to dependency: {:?}", + "registration should succeed for derived dataset: {:?}", result.err() ); } From 0995bb5a8e9221d9c24a48747005eca6b298deda Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 23 Feb 2026 09:23:58 -0700 Subject: [PATCH 2/2] fix TS --- typescript/amp/src/ManifestBuilder.ts | 1 - typescript/amp/src/Model.ts | 2 -- typescript/amp/test/Amp.test.ts | 41 --------------------------- 3 files changed, 44 deletions(-) diff --git a/typescript/amp/src/ManifestBuilder.ts b/typescript/amp/src/ManifestBuilder.ts index e6cbb4e78..ca9566074 100644 --- a/typescript/amp/src/ManifestBuilder.ts +++ b/typescript/amp/src/ManifestBuilder.ts @@ -124,7 +124,6 @@ export class ManifestBuilder extends Effect.Service()("Amp/Mani const manifest = new Model.DatasetDerived({ kind: "manifest", - startBlock: config.startBlock, dependencies: config.dependencies, tables: Object.fromEntries(tables), functions: Object.fromEntries(functions), diff --git a/typescript/amp/src/Model.ts b/typescript/amp/src/Model.ts index 7f936fb72..b6f474979 100644 --- a/typescript/amp/src/Model.ts +++ b/typescript/amp/src/Model.ts @@ -258,7 +258,6 @@ export class DatasetConfig extends Schema.Class("DatasetConfig")( sources: Schema.Array(DatasetSource).pipe(Schema.optional), license: DatasetLicense.pipe(Schema.optional), private: Schema.Boolean.pipe(Schema.optional), - startBlock: Schema.Number.pipe(Schema.optional), dependencies: Schema.Record({ key: Schema.String, value: DatasetReferenceFromString }), tables: Schema.Record({ key: Schema.String, value: TableDefinition }).pipe(Schema.optional), functions: Schema.Record({ key: Schema.String, value: FunctionDefinition }).pipe(Schema.optional), @@ -328,7 +327,6 @@ export class FunctionManifest extends Schema.Class("FunctionMa export class DatasetDerived extends Schema.Class("DatasetDerived")({ kind: Schema.Literal("manifest"), - startBlock: Schema.NullOr(Schema.Number).pipe(Schema.optional, Schema.fromKey("start_block")), dependencies: Schema.Record({ key: Schema.String, value: DatasetReferenceFromString }), tables: Schema.Record({ key: Schema.String, value: Table }), functions: Schema.Record({ key: Schema.String, value: FunctionManifest }), diff --git a/typescript/amp/test/Amp.test.ts b/typescript/amp/test/Amp.test.ts index 3e33e8563..f596a6fe1 100644 --- a/typescript/amp/test/Amp.test.ts +++ b/typescript/amp/test/Amp.test.ts @@ -144,47 +144,6 @@ Testing.layer((it) => { }), ) - it.effect( - "register and deploy derived dataset with start_block", - Effect.fn(function*() { - const admin = yield* Admin.Admin - const fixtures = yield* Fixtures.Fixtures - const flight = yield* ArrowFlight.ArrowFlight - - // Load the example manifest and add a start_block - const baseDataset = yield* fixtures.load("manifest.json", Model.DatasetDerived) - const dataset = new Model.DatasetDerived({ - ...baseDataset, - startBlock: 4, - }) - - const namespace = Model.DatasetNamespace.make("_") - const name = Model.DatasetName.make("example_with_start_block") - const version = Model.DatasetVersion.make("0.0.1") - - // Register and deploy - yield* admin.registerDataset(namespace, name, dataset, version) - const job = yield* admin.deployDataset(namespace, name, version, { - endBlock: "5", - }) - - // Wait for job to complete - yield* Testing.waitForJobCompletion(job.jobId) - - // Verify registration - const response = yield* admin.getDatasetVersion(namespace, name, version) - assertEquals(response.name, "example_with_start_block") - - // Query the deployed dataset to verify it works - const stream = flight.query(` - SELECT block_num FROM "_/example_with_start_block@0.0.1".counts ORDER BY block_num ASC LIMIT 1 - `) - const data = new Table(yield* Stream.runCollect(stream).pipe(Effect.map(Chunk.toArray))) - const rows = data.toArray() - assertSome(Array.head(rows).pipe(Option.map(Struct.get("block_num"))), BigInt(4)) - }), - ) - it.effect( "can fetch a list of datasets", Effect.fn(function*() {