Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/common/src/datasets_derived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ pub fn dataset(reference: HashReference, manifest: Manifest) -> Result<Dataset,
reference,
manifest.dependencies,
DerivedDatasetKind,
manifest.start_block,
false,
tables,
functions,
Expand Down
5 changes: 1 addition & 4 deletions crates/core/datasets-derived/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{

pub struct Dataset {
pub(crate) tables: Vec<Table>,
pub(crate) start_block: Option<BlockNum>,
pub(crate) kind: DerivedDatasetKind,
pub(crate) dependencies: BTreeMap<DepAlias, DepReference>,
pub(crate) functions: Vec<Function>,
Expand All @@ -32,7 +31,6 @@ impl Dataset {
reference: HashReference,
dependencies: BTreeMap<DepAlias, DepReference>,
kind: DerivedDatasetKind,
start_block: Option<BlockNum>,
finalized_blocks_only: bool,
tables: Vec<Table>,
functions: Vec<Function>,
Expand All @@ -41,7 +39,6 @@ impl Dataset {
reference,
dependencies,
kind,
start_block,
finalized_blocks_only,
tables,
functions,
Expand Down Expand Up @@ -97,7 +94,7 @@ impl datasets_common::dataset::Dataset for Dataset {
}

fn start_block(&self) -> Option<BlockNum> {
self.start_block
None
}

fn finalized_blocks_only(&self) -> bool {
Expand Down
6 changes: 1 addition & 5 deletions crates/core/datasets-derived/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::BTreeMap;

// Re-export schema types from datasets-common
pub use datasets_common::manifest::{ArrowSchema, Field, Function, FunctionSource, TableSchema};
use datasets_common::{block_num::BlockNum, network_id::NetworkId, table_name::TableName};
use datasets_common::{network_id::NetworkId, table_name::TableName};

use crate::{
dataset_kind::DerivedDatasetKind,
Expand All @@ -29,10 +29,6 @@ pub struct Manifest {
/// Dataset kind, must be `manifest`
pub kind: DerivedDatasetKind,

/// Dataset start block
#[serde(default)]
pub start_block: Option<BlockNum>,

/// External dataset dependencies with version requirements
#[serde(default)]
pub dependencies: BTreeMap<DepAlias, DepReference>,
Expand Down
36 changes: 2 additions & 34 deletions crates/core/worker-datasets-derived/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,6 @@ pub enum Error {
/// the beginning.
#[error("Parallel table dump tasks failed")]
ParallelTasksFailed(#[source] TryWaitAllError<DumpTableError>),

/// Start block before dependencies
///
/// This occurs when the start block of the derived dataset is before the earliest block of the dependencies.
#[error(
"derived dataset start_block ({dataset_start_block}) is before dependency's earliest available block ({dependency_earliest_block})"
)]
StartBlockBeforeDependencies {
dataset_start_block: BlockNum,
dependency_earliest_block: BlockNum,
},
}

impl Error {
Expand Down Expand Up @@ -505,7 +494,6 @@ async fn dump_table(
.map_err(DumpTableError::CreatePhysicalCatalog)?
};
let planning_ctx = PlanContext::new(env.session_config.clone(), catalog.logical().clone());
let manifest_start_block = manifest.start_block;

join_set.spawn(
async move {
Expand All @@ -530,16 +518,8 @@ async fn dump_table(
return Ok::<(), DumpTableSpawnError>(());
};

// If derived dataset has a start_block, validate it
if let Some(dataset_start_block) = &manifest_start_block
&& dataset_start_block < &dependency_earliest_block
{
return Err(DumpTableSpawnError::StartBlockBeforeDependencies {
dataset_start_block: *dataset_start_block,
dependency_earliest_block,
});
}
let start = manifest_start_block.unwrap_or(dependency_earliest_block);
// Derived datasets inherit start_block from their dependencies
let start = dependency_earliest_block;

let resolved = resolve_end_block(&end, start, async {
let query_ctx =
Expand Down Expand Up @@ -750,18 +730,6 @@ pub enum DumpTableSpawnError {
#[error("failed to get earliest block: {0}")]
EarliestBlock(#[source] EarliestBlockError),

/// Dataset start block is before the earliest dependency block
///
/// This occurs when the manifest specifies a start block that is
/// earlier than the earliest available data in the dependencies.
#[error(
"earliest block is before start block: {dataset_start_block} < {dependency_earliest_block}"
)]
StartBlockBeforeDependencies {
dataset_start_block: BlockNum,
dependency_earliest_block: BlockNum,
},

/// Failed to resolve the end block for dumping
///
/// This occurs when determining the target end block for the
Expand Down
36 changes: 0 additions & 36 deletions crates/services/admin-api/src/handlers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::collections::BTreeMap;
use amp_data_store::{DataStore, PhyTableRevision};
use amp_datasets_registry::error::ResolveRevisionError;
use common::{
BlockNum,
catalog::logical::for_manifest_validation::{
self as catalog, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError,
TableReferencesMap,
Expand Down Expand Up @@ -257,27 +256,6 @@ pub async fn validate_derived_manifest(
dependencies.insert(alias.clone(), reference);
}

// Check if the start block is before the earliest available block of the dependencies
if let Some(dataset_start_block) = &manifest.start_block {
for (alias, dataset_ref) in &dependencies {
let dataset = store.get_dataset(dataset_ref).await.map_err(|err| {
ManifestValidationError::FetchDependencyDataset {
alias: alias.to_string(),
source: err,
}
})?;

if let Some(dep_start_block) = dataset.start_block()
&& *dataset_start_block < dep_start_block
{
return Err(ManifestValidationError::StartBlockBeforeDependencies {
dataset_start_block: *dataset_start_block,
dependency_earliest_block: dep_start_block,
});
}
}
}

// Step 2: Parse all SQL queries and extract references
// Store parsed statements to avoid re-parsing in Step 4
let mut statements: BTreeMap<TableName, Statement> = BTreeMap::new();
Expand Down Expand Up @@ -589,20 +567,6 @@ pub enum ManifestValidationError {
/// Failed to create DataFusion session configuration
#[error("failed to create session config")]
SessionConfig(#[source] datafusion::error::DataFusionError),

/// Start block before dependencies
///
/// This occurs when the start block of the derived dataset is before
/// the earliest block of one of the dependencies.
#[error(
"derived dataset start_block ({dataset_start_block}) is before dependency's earliest available block ({dependency_earliest_block})"
)]
StartBlockBeforeDependencies {
/// The start block of the derived dataset
dataset_start_block: BlockNum,
/// The earliest available block of the dependency dataset
dependency_earliest_block: BlockNum,
},
}

/// Registers all files in a revision with their Amp-specific metadata.
Expand Down
10 changes: 0 additions & 10 deletions docs/manifest-schemas/derived.spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@
"description": "Dataset kind, must be `manifest`",
"const": "manifest"
},
"start_block": {
"description": "Dataset start block",
"type": [
"integer",
"null"
],
"format": "uint64",
"default": null,
"minimum": 0
},
"tables": {
"description": "Table definitions mapped by table name",
"type": "object",
Expand Down
79 changes: 5 additions & 74 deletions tests/src/tests/it_admin_api_datasets_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,85 +165,16 @@ async fn register_multiple_versions_of_same_dataset_succeeds() {
}

#[tokio::test]
async fn register_with_start_block_before_dependency_fails() {
async fn register_derived_dataset_succeeds() {
//* Given
let ctx = TestCtx::setup("test_register_start_block_before_dep").await;
let ctx = TestCtx::setup("test_register_derived_dataset").await;
let namespace = "_".parse::<Namespace>().expect("valid namespace");
let name = "start_block_invalid"
let name = "derived_dataset"
.parse::<Name>()
.expect("valid dataset name");
let version = "1.0.0".parse::<Version>().expect("valid version");

let mut manifest = create_test_manifest();
manifest.start_block = Some(0); // eth_firehose starts at 15000000
let manifest_str =
serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON");

//* When
let result = ctx
.register_dataset(&namespace, &name, &version, &manifest_str)
.await;

//* Then
assert!(
result.is_err(),
"registration should fail when start_block is before dependency"
);
let err = result.unwrap_err();
match err {
RegisterError::ManifestValidationError(api_err) => {
assert_eq!(api_err.error_code, "MANIFEST_VALIDATION_ERROR");
assert!(
api_err.error_message.contains("start_block")
&& api_err.error_message.contains("before dependency"),
"Error should mention start_block and dependency: {}",
api_err.error_message
);
}
_ => panic!("Expected ManifestValidationError, got: {:?}", err),
}
}

#[tokio::test]
async fn register_with_valid_start_block_succeeds() {
//* Given
let ctx = TestCtx::setup("test_register_valid_start_block").await;
let namespace = "_".parse::<Namespace>().expect("valid namespace");
let name = "start_block_valid"
.parse::<Name>()
.expect("valid dataset name");
let version = "1.0.0".parse::<Version>().expect("valid version");

let mut manifest = create_test_manifest();
manifest.start_block = Some(16_000_000); // After eth_firehose's 15000000
let manifest_str =
serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON");

//* When
let result = ctx
.register_dataset(&namespace, &name, &version, &manifest_str)
.await;

//* Then
assert!(
result.is_ok(),
"registration should succeed with valid start_block: {:?}",
result.err()
);
}

#[tokio::test]
async fn register_with_start_block_equal_to_dependency_succeeds() {
//* Given
let ctx = TestCtx::setup("test_register_start_block_equal").await;
let namespace = "_".parse::<Namespace>().expect("valid namespace");
let name = "start_block_equal"
.parse::<Name>()
.expect("valid dataset name");
let version = "1.0.0".parse::<Version>().expect("valid version");

let mut manifest = create_test_manifest();
manifest.start_block = Some(15_000_000); // Equal to eth_firehose's start_block
let manifest = create_test_manifest();
let manifest_str =
serde_json::to_string(&manifest).expect("failed to serialize manifest to JSON");

Expand All @@ -255,7 +186,7 @@ async fn register_with_start_block_equal_to_dependency_succeeds() {
//* Then
assert!(
result.is_ok(),
"registration should succeed with start_block equal to dependency: {:?}",
"registration should succeed for derived dataset: {:?}",
result.err()
);
}
Expand Down
1 change: 0 additions & 1 deletion typescript/amp/src/ManifestBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ export class ManifestBuilder extends Effect.Service<ManifestBuilder>()("Amp/Mani

const manifest = new Model.DatasetDerived({
kind: "manifest",
startBlock: config.startBlock,
dependencies: config.dependencies,
tables: Object.fromEntries(tables),
functions: Object.fromEntries(functions),
Expand Down
2 changes: 0 additions & 2 deletions typescript/amp/src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ export class DatasetConfig extends Schema.Class<DatasetConfig>("DatasetConfig")(
sources: Schema.Array(DatasetSource).pipe(Schema.optional),
license: DatasetLicense.pipe(Schema.optional),
private: Schema.Boolean.pipe(Schema.optional),
startBlock: Schema.Number.pipe(Schema.optional),
dependencies: Schema.Record({ key: Schema.String, value: DatasetReferenceFromString }),
tables: Schema.Record({ key: Schema.String, value: TableDefinition }).pipe(Schema.optional),
functions: Schema.Record({ key: Schema.String, value: FunctionDefinition }).pipe(Schema.optional),
Expand Down Expand Up @@ -328,7 +327,6 @@ export class FunctionManifest extends Schema.Class<FunctionManifest>("FunctionMa

export class DatasetDerived extends Schema.Class<DatasetDerived>("DatasetDerived")({
kind: Schema.Literal("manifest"),
startBlock: Schema.NullOr(Schema.Number).pipe(Schema.optional, Schema.fromKey("start_block")),
dependencies: Schema.Record({ key: Schema.String, value: DatasetReferenceFromString }),
tables: Schema.Record({ key: Schema.String, value: Table }),
functions: Schema.Record({ key: Schema.String, value: FunctionManifest }),
Expand Down
41 changes: 0 additions & 41 deletions typescript/amp/test/Amp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,47 +144,6 @@ Testing.layer((it) => {
}),
)

it.effect(
"register and deploy derived dataset with start_block",
Effect.fn(function*() {
const admin = yield* Admin.Admin
const fixtures = yield* Fixtures.Fixtures
const flight = yield* ArrowFlight.ArrowFlight

// Load the example manifest and add a start_block
const baseDataset = yield* fixtures.load("manifest.json", Model.DatasetDerived)
const dataset = new Model.DatasetDerived({
...baseDataset,
startBlock: 4,
})

const namespace = Model.DatasetNamespace.make("_")
const name = Model.DatasetName.make("example_with_start_block")
const version = Model.DatasetVersion.make("0.0.1")

// Register and deploy
yield* admin.registerDataset(namespace, name, dataset, version)
const job = yield* admin.deployDataset(namespace, name, version, {
endBlock: "5",
})

// Wait for job to complete
yield* Testing.waitForJobCompletion(job.jobId)

// Verify registration
const response = yield* admin.getDatasetVersion(namespace, name, version)
assertEquals(response.name, "example_with_start_block")

// Query the deployed dataset to verify it works
const stream = flight.query(`
SELECT block_num FROM "_/example_with_start_block@0.0.1".counts ORDER BY block_num ASC LIMIT 1
`)
const data = new Table(yield* Stream.runCollect(stream).pipe(Effect.map(Chunk.toArray)))
const rows = data.toArray()
assertSome(Array.head(rows).pipe(Option.map(Struct.get("block_num"))), BigInt(4))
}),
)

it.effect(
"can fetch a list of datasets",
Effect.fn(function*() {
Expand Down