-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: real-time recency MVP #25195
Conversation
@MaterializeInc/testing Could use y'all's expertise in developing a testing suite against this prototype of real-time recency. The idea behind RTR is that when you issue a RTR query, we wait until we've ingested all data that is visible in the external system at the "same" physical time that the query gets issued. Ideally, we would:
The delay we introduce would need to be long enough that we can convince ourselves that we're not just "getting lucky." Which testing framework would you recommend for this? My guess is instrumenting the Kafka source with a One small wrinkle with that approach is that I would ultimately like to query multiple Kafka sources, each with their own distinct latency and getting that wired up with |
For the ingestion lag toxiproxy is good, we already use it in some tests, you can make it as slow as you want in ingesting.
This is doable with Toxiproxy, you can have multiple Kafka sources, all with their own delays, see for example workflow_sink_networking. If this is too cumbersome, I can also write a test for you. |
34cbeae
to
51f68da
Compare
@def- This is in a shape that we can begin testing it. The RTR operation itself is expensive, so opening 100 (1000? 10?) clients doing simultaneous RTR queries might fall over in some unexpected way. The most meaningful tests will ensure we uphold our semantics, but performance is a secondary concern. I'm going to work on seeing if there's a way to make this the default for testing queries from Kafka sources but that seems a little tricky. |
51f68da
to
06e67d8
Compare
@nrainer-materialize Here's the real-time recency work we chatted about expanding the tests of. I believe @def- had some outstanding tests, but am not sure exactly what he'd planned or had in mind. |
Thanks! I will have a look and work on it next week. |
@rjobanp You have bandwidth/interest in adding a MySQL real-time recency test akin to the Kafka and PG ones? I'm sure you're much defter than I with their dialect of SQL, so thought I'd ask. nbd if you're bandwidth constrained. |
@sploiselle based on the pg-rtr test it should be almost identical for you to add a mysql one -- the only change I that |
Oh, I do get some errors for #25566 in https://buildkite.com/materialize/nightly/builds/7683. Though, I haven't had the time to take a deeper look yet. |
Alright. Regarding testing: 1) RTR enabled everywhere#25463, originally by Dennis, enables RTR in all CI. This PR is a one-shot test and not supposed to be merged. Tests pipeline
Nightly
2) Additional tests#25566, originally by Dennis, adds additional RTR tests and extends existing test frameworks (data-ingest, parallel-workload). That branch is based on the one of this PR. All additional commits of that branch should be cherry-picked to this branch (except for 719ac30, which was cherry-picked from main and could cause conflicts). Added tests in tests pipelineOf the added tests, there are some that do not pass but presumably should. They are in workflow resumption in NightlyThe nightly pipeline (nearly) passes after adjusting some timeouts.
SummaryTo sum up, there still seem to be some issues:
Let me know if you have any questions! |
@nrainer-materialize Thank you SO much for the detailed reporting here. The nightly timeouts make sense to me but I'm pretty spooked by the result mismatches |
@sploiselle, please let me know if you need further support or want me to retest something. Thank you. |
@nrainer-materialize Will do! I'm making my way through the feedback you provided and so far all of the issues have been either issues of scale or subtle issues with the tests themselves. Planning a detailed accounting of what's to be done, but I'm feeling good that this prototype is in the right shape. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of mess to clean up from integrating QA's tests, as well as notes to tighten up.
src/storage-controller/src/lib.rs
Outdated
(connection.clone(), *remap_collection_id) | ||
} | ||
|
||
// These internal sources will never support RTR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never is too strong a word; maybe they should support RTR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And lo, it came to pass...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should produce an error here as there are no recency guarantees we can offer
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:The risk score for this pull request is high at 83, indicating a significant likelihood of introducing a bug. This assessment is driven by predictors such as the average line count and the number of executable lines within files. There are 8 modified files that are known hotspots for bugs, which increases the risk. Historically, pull requests with similar characteristics are 124% more likely to cause a bug compared to the repository's baseline. While the repository's predicted bug trend is decreasing, which is a positive sign, the observed bug trend remains steady. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
The performance impact mentioned here is largely (entirely?) related to latency. We expect the latency of RTR queries to exceed those of non-RTR queries commensurate with the speed at which we ingest data and propagate the ingestion through the entire system. The prototype itself doesn't introduce much undue latency to the queries (i.e. envd cannot figure out much more quickly than listening to the remap shard when a value's been ingested).
The scalability of the prototype is not great! We open a separate connection to the upstream object for each query. I expect the number of concurrent queries we support to be quite low, and this is something that we can both get customer feedback on, as well as easily improve. (e.g. an easy win is to only allow one outstanding RTR query per connection object, and we can stash all queries waiting on that object in a queue. Once the current query returns, we can issue another RTR timestamp fetch, and the timestamp returned is valid for all queued queries).
The resumption tests are, unfortunately, pathologically structured and can never succeed. One of the tenants of the current prototype is that we use the same connection object as the source itself (i.e. it must have the same parameters, etc.). The current design of the resumption tests introduces a partition (of sorts) between envd and the source's upstream object, and then issues a RTR query. Unfortunately, this partition not only affects the ingestion, but also impedes our ability to determine the RTR timestamp (which is why all of the queries fail). A trickier design here would be to issues the RTR query and then introduce the issue and then fix it. The timing of all of this seems very racy, though so I'm not sure exactly how we'd want to instrument it. |
@MaterializeInc/storage This is ready for a code review. Note that this doesn't include a |
I can add some if we consider that important! |
@sploiselle: I added mysql-rtr tests with 06b63ed. |
@nrainer-materialize tysm! |
@@ -2016,22 +2016,27 @@ impl Coordinator { | |||
// data. We "cheat" a little bit and filter out any IDs that aren't | |||
// user objects because we know they are not a RTR source. | |||
let mut to_visit = VecDeque::from_iter(source_ids.filter(GlobalId::is_user)); | |||
let mut visited = BTreeSet::new(); | |||
// If none of the sources are user objects, we don't support RTR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If none of the sources are user objects, we don't support RTR. | |
// If none of the sources are user objects, we don't need to provide a RTR timestamp. |
What happens in the caller though? The query returns immediately? Or times out because it can't get a timestamp to wait for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test case for this? If not, could one be added easily?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens in the caller though? The query returns immediately? Or times out because it can't get a timestamp to wait for?
Their query is not influenced by any external system's frontier, so the RTR machinery just isn't involved in determining a timestamp. We can test this by querying something that doesn't depend on a user source and show that it returns values––certainly doesn't hurt to demonstrate.
test/kafka-rtr/mzcompose.py
Outdated
@@ -59,8 +59,10 @@ def workflow_simple(c: Composition) -> None: | |||
) | |||
|
|||
|
|||
# TODO: All failure modes fail with: timed out before ingesting the source's visible frontier when real-time-recency query issued. input_1 failed to ingest data up to the real-time recency point | |||
def workflow_resumption(c: Composition) -> None: | |||
# It is impossible for this test to succeed because the network failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More accurate to say that this can't pass without making the RTR-related connections robust to network failure?
Is this something that would only make sense to re-visit in the context of the work to add HA/"always on" behavior? Or resuming these broken connections could add value even before HA work is done?
If this is correct, should we open a issue to work on this at some point, and add it to the mega tracker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this to work, we would have to do something other than blithely partition envd
away from the external system. If you partition envd
away from the external system, you cannot reach out to it to determine its frontier. For this test to work, we would need to do something trickier, like only partition us away from the external system after the thread performing the RTR determination received a response from the query that determines the external frontier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this meant to test though? If it can never be fixed do we need this function in the codebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine with me.
test/pg-rtr/rtr/verify-rtr.td
Outdated
INSERT INTO table_a SELECT 1,2 FROM generate_series(1, 100); | ||
INSERT INTO table_b SELECT 1,2 FROM generate_series(1, 100); | ||
|
||
> SELECT sum < 4000207 FROM sum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the potential to flake, right? It relies on this query executing fast enough before the inserts in pg above are ingested. Maybe we should only test that the correct result is served on the first try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INSERT INTO table_a SELECT 1,2 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT 100; | ||
INSERT INTO table_b SELECT 1,2 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT 100; | ||
|
||
> SELECT sum < 4000207 FROM sum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concern here for flakes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.join() | ||
|
||
|
||
def workflow_multithreaded(c: Composition) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we trying to test with this multithreaded setup? Can we record the intention in the doc comment for this workflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaterializeInc/testing PTAL
src/storage-types/src/sources.rs
Outdated
|
||
async fn decode_remap_data_until_geq_external_frontier< | ||
ExternalFrontier: SourceTimestamp, | ||
T: Timestamp + Lattice + Codec64 + From<EpochMillis>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in the body does not work for any Lattice
timestamp. We either need a TotalOrder
here or (my preference) just make this specific over MzTimestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have pointers for concretizing this to mz_repr::Timestamp
? That the storage controller gets made available through a trait makes this hard to do.
) -> BoxFuture<'static, T> { | ||
// Dummy implementation | ||
Box::pin(async { T::minimum() }) | ||
ids: BTreeSet<GlobalId>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The doc comment still refers to this argument as source_ids
. Does the name change imply that this can contain more than sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified the comment
src/storage-controller/src/lib.rs
Outdated
(connection.clone(), *remap_collection_id) | ||
} | ||
|
||
// These internal sources will never support RTR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should produce an error here as there are no recency guarantees we can offer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adapter parts lgtm.
ed0804d
to
aead2fd
Compare
match connection { | ||
GenericSourceConnection::Kafka(kafka) => { | ||
let external_frontier = kafka | ||
.fetch_write_frontier(&config) | ||
.await | ||
.map_err(StorageError::Generic)?; | ||
|
||
decode_remap_data_until_geq_external_frontier( | ||
id, | ||
external_frontier, | ||
as_of, | ||
remap_subscribe, | ||
) | ||
.await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not covered by any test according to the coverage report
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible the coverage report is mistaken? The Kafka RTR tests should be using this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's definitively an option! The report is not 100% accurate.
GenericSourceConnection::MySql(my_sql) => { | ||
let external_frontier = my_sql | ||
.fetch_write_frontier(&config) | ||
.await | ||
.map_err(StorageError::Generic)?; | ||
|
||
decode_remap_data_until_geq_external_frontier( | ||
id, | ||
external_frontier, | ||
as_of, | ||
remap_subscribe, | ||
) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not covered by any test according to the coverage report
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto #25195 (comment)
@@ -120,6 +129,61 @@ impl<C: ConnectionAccess> KafkaSourceConnection<C> { | |||
} | |||
} | |||
|
|||
impl KafkaSourceConnection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class not covered by any test according to the coverage report?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you double-check my thinking here that these should be covered by the Kafka RTR tests? Glad to hop on a Zoom to hammer this out.
@@ -135,6 +135,36 @@ pub static MYSQL_PROGRESS_DESC: Lazy<RelationDesc> = Lazy::new(|| { | |||
.with_column("transaction_id", ScalarType::UInt64.nullable(true)) | |||
}); | |||
|
|||
impl MySqlSourceConnection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class not covered by any test according to the coverage report?
Or should this be covered by the MySQL tests that I added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inadvertently lost the diff that contained your tests; recovered it and they're back in this PR.
b362abb
to
0840961
Compare
@petrosagg Addressed all of your feedback modulo erroring if you have RTR enabled and query a load generator source. I think the ergonomics of that are less friendly than they could be (we let you query everything else that doesn't actually support RTR; making this an exception feels odd to me). I'd prefer to educate users about those semantics in documentation. Also gave up on semantic commits and everything is just in one commit now. Apologies if that introduces any overhead for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for addressing the comments
test/kafka-rtr/mzcompose.py
Outdated
@@ -59,8 +59,10 @@ def workflow_simple(c: Composition) -> None: | |||
) | |||
|
|||
|
|||
# TODO: All failure modes fail with: timed out before ingesting the source's visible frontier when real-time-recency query issued. input_1 failed to ingest data up to the real-time recency point | |||
def workflow_resumption(c: Composition) -> None: | |||
# It is impossible for this test to succeed because the network failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this meant to test though? If it can never be fixed do we need this function in the codebase?
MVP for real-time recency. Once this merges, it should be appropriate for private preview.
Motivation
This PR adds a known-desirable feature. MaterializeInc/database-issues#4632
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.