Conversation
aaaa611 to
cb46136
Compare
crates/core/datasets-raw/src/rows.rs
Outdated
| if rows | ||
| .schema() | ||
| .column_with_name(RESERVED_TS_COLUMN_NAME) | ||
| .is_some() | ||
| { | ||
| let ts_col = rows | ||
| .column_by_name(RESERVED_TS_COLUMN_NAME) | ||
| .ok_or(CheckInvariantsError::MissingTsColumn)?; | ||
| if !matches!( | ||
| ts_col.data_type(), | ||
| DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "+00:00" | ||
| ) { | ||
| return Err(CheckInvariantsError::InvalidTsColumnType); | ||
| } | ||
| } |
There was a problem hiding this comment.
| if rows | |
| .schema() | |
| .column_with_name(RESERVED_TS_COLUMN_NAME) | |
| .is_some() | |
| { | |
| let ts_col = rows | |
| .column_by_name(RESERVED_TS_COLUMN_NAME) | |
| .ok_or(CheckInvariantsError::MissingTsColumn)?; | |
| if !matches!( | |
| ts_col.data_type(), | |
| DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "+00:00" | |
| ) { | |
| return Err(CheckInvariantsError::InvalidTsColumnType); | |
| } | |
| } | |
| if let Some(ts_col) = rows.column_by_name(RESERVED_TS_COLUMN_NAME) | |
| && !matches!( | |
| ts_col.data_type(), | |
| DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "+00:00" | |
| ) | |
| { | |
| return Err(CheckInvariantsError::InvalidTsColumnType); | |
| } |
crates/core/datasets-raw/src/rows.rs
Outdated
| /// Required `_ts` column is missing from the record batch | ||
| #[error("missing _ts column")] | ||
| MissingTsColumn, | ||
|
|
There was a problem hiding this comment.
This error variant is dead code if the above suggestion is applied.
| /// Required `_ts` column is missing from the record batch | |
| #[error("missing _ts column")] | |
| MissingTsColumn, |
| .parent() | ||
| .unwrap() | ||
| .join("tests/config/manifests/eth_rpc.json"); | ||
| .join("tests/config/manifests/eth_rpc_generated.json"); |
There was a problem hiding this comment.
Looks like we've added these _generated.json files without removing the old ones. Is this intentional?
There was a problem hiding this comment.
The situation is:
- We have tests for the output of
ampctl genand we want new manifests generated with the_tscolumn. - I still wanted to keep all tests on old manifests to be sure this would not break existing datasets that do not have
_ts.
So we needed two manifests, at least for now.
crates/core/common/src/udfs/ts.rs
Outdated
| }; | ||
|
|
||
| /// A planning-time sentinel UDF that gets replaced with the appropriate `_ts` | ||
| /// expression during `SystemColumnPropagator::f_up`. Panics if it reaches execution. |
There was a problem hiding this comment.
There is an inconsistency here between the name SystemColumnPropagator and WatermarkColumnPropagator. Might be worth doing a pass to consolidate on one term (system column vs watermark column).
| .iter() | ||
| .copied() | ||
| .filter(|wm| { | ||
| let name = wm.column_name(); | ||
| schemas | ||
| .iter() | ||
| .all(|s| s.fields().iter().any(|f| f.name() == name)) | ||
| }) | ||
| .collect() |
There was a problem hiding this comment.
We should probably copy after the filter
| .iter() | |
| .copied() | |
| .filter(|wm| { | |
| let name = wm.column_name(); | |
| schemas | |
| .iter() | |
| .all(|s| s.fields().iter().any(|f| f.name() == name)) | |
| }) | |
| .collect() | |
| .iter() | |
| .filter(|wm| { | |
| let name = wm.column_name(); | |
| schemas | |
| .iter() | |
| .all(|s| s.fields().iter().any(|f| f.name() == name)) | |
| }) | |
| .copied() | |
| .collect() |
cb46136 to
0dbe2f3
Compare
|
@Theodus thanks for the review, I've addressed your comments |
Addresses #2048.
This PR implements a
_tsbuilt-in and auto-propagated timestamp column, just like we do for_block_num. The two columns are abstracted byWatermarkColumn.The largest concern with this PR is backwards compatibility, to not cause immediate schema mismatch errors on existing deployments. So we make sure the
_tscolumn is only included when it is available at the source and requested by the consumer of the query.Remaining piece of #2048 is implementing a temporary hack to recognize the
timestampEVM column as equivalent to_ts, so we don't have to resync raw datasets to use the feature.