-
Notifications
You must be signed in to change notification settings - Fork 327
Feat: state import/export #4038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
|
|
||
| @cli.group(no_args_is_help=True) | ||
| def state() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a slight departure from our existing structure but I decided to group state operations to keep things open for more operations in future (such as perhaps being able to query state).
So the CLI syntax is sqlmesh state dump or sqlmesh state load vs something like sqlmesh state_dump or sqlmesh state_load.
@treysp i'd be keen to know if this is a direction you were planning to head in given the recent CLI refactoring work
sqlmesh/core/state_sync/base.py
Outdated
| self.add_snapshots_intervals([snapshot_intervals]) | ||
|
|
||
| @abc.abstractmethod | ||
| def load(self, stream: StateStream, clear: bool = True) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to make dump/load first class citizens on StateSync rather than trying to coordinate everything over the public interface.
The reason is to allow different StateSync implementations to perform local optimizations or call internal methods without exposing them publicly and also helps with being able to wrap things in transactions.
76b08ca to
480d023
Compare
sqlmesh/core/state_sync/dump_load.py
Outdated
| yield "environments", _dump_environments(state_stream.environments) | ||
| console.update_state_dump_environments(complete=True) | ||
|
|
||
| yield "auto_restatements", _dump_auto_restatements(state_stream.auto_restatements) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we should literally dump our tables 1-to-1. This is way too low level. For example a complete snapshot instance is assembled using data from _snapshots, _intervals and _auto_restatements tables. I don't think we want to expose users to all these internals. Instead, I believe it should just be environments, snapshots, versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, the format should be compatible with export of the local state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding AutoRestatements, I could see that the _auto_restatements table is joined in when calling get_snapshots() but I couldn't see how it was being populated if the snapshots were written back via push_snapshots().
But I guess part of the load could be to extract the auto restatement information from the Snapshot records themselves and call update_auto_restatements() to create the auto restatement records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've improved the import implementation to keep track of the auto restatements as the snapshots are being inserted and then insert them at the end.
This means the auto restatements table no longer needs to be written to the state file
georgesittas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick first pass, agree with Iaroslav's comments. Nice 👍
480d023 to
6c67823
Compare
27c6a44 to
8bb7379
Compare
| self.snapshot_state.push_snapshots( | ||
| snapshot_iterator, overwrite=overwrite_existing_snapshots | ||
| ) | ||
| self.add_snapshots_intervals((s.snapshot_intervals for s in intervals_iterator)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that doing it like this will not restore the interval records to their original state at the time of the dump. When we return snapshots from the state we merge intervals by the (name, version) tuple, effectively losing information about which identifier each interval record was associated with. In theory it shouldn't matter, but in practice there's a high likelihood of edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the correct way to handle this?
The reason I took this approach is because I observed that it's what the evaluator seems to do
My assumption was the fact that EngineAdapterStateSync splits out intervals from the snapshots to store them separately and joins them on load was just an implementation detail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's a bit more complex than this unfortunately. I'm sure there are corner cases, but I’m struggling to come up with one right now. Let's keep this as is for now
sqlmesh/core/state_sync/db/facade.py
Outdated
| if not clear and environment.name in existing_environments: | ||
| self.environment_state.update_environment(environment) | ||
| else: | ||
| self.promote(environment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this. Why not just always update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So originally I was using promote() to write environments back.
But then I needed to support the "merge" strategy which fails with a plan ID error if you try to overwrite an existing environment using promote(), so I switched to using update_environment() for those.
But it turns out you're quite right, update_environment() works in both cases so I have simplified this
sqlmesh/core/state_sync/db/facade.py
Outdated
| existing_environments = set(self.get_environments_summary().keys()) if not clear else set() | ||
| for environment in stream.environments: | ||
| if not clear and environment.name in existing_environments: | ||
| self.environment_state.update_environment(environment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't update the "environment statements" (before_all / after_all)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thanks for spotting. I've adjusted it to take the EnvironmentStatements into account too.
This prompted a change to the environments dict in the state file to now look like:
"environments": {
"prod": {
"environment": {
/* Environment object */
},
"statements": [
/* list of EnvironmentStatements objects for the environment */
]
},
"dev": {
/*...*/
}
}
| "sqlglot[rs]~=26.12.0", | ||
| "tenacity", | ||
| "time-machine", | ||
| "json-stream" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look particularly alive: https://github.com/daggaz/json-stream. Are we sure we want to depend on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last release was Jan 12th 2025: https://pypi.org/project/json-stream/
It seemed more alive than some of the other stuff we depend on
| def _new_handle() -> StreamingJSONObject: | ||
| handle = input_file.open("r", encoding="utf8") | ||
| handles.append(handle) | ||
| stream = json_stream.load(handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use persistent=True and hence avoid creating a separate file handle per top-level field? Our state in raw JSON format doesn't use that much memory (correct me if I'm wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That effectively disables streaming though? I didn't see a problem with creating multiple read only file handles, it allows the file to be read out of order and for different parts of the code to be able to focus on their bit without having to be aware of what other parts are doing.
The point of streaming the file was to avoid the "load everything into memory" problem which always rears its head at the least convenient moment.
I get that the current StateSync interface doesnt allow streaming loads for anything except Snapshots but I wanted to keep things flexible at this layer since I anticipate StateSync being improved in future
This PR implements the ability to export the state database to a file and import it back.
The state export file format is a
jsonfile. I tried to implement a streaming interface via theStateStreamabstraction and the use of the json_stream library. The goal is to be able to dump large projects without loading everything into memory and crashing with an OOM.In terms of version compatibility, there is a hard requirement to use the same version of SQLMesh to load the state as was used to dump it. This greatly simplifies the implementation and ensures our Pydantic model definitions will always be compatible. Guidance is included in the documentation on how to upgrade an older state file to be compatible with a new version of SQLMesh.
State export:

State import:
