-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(migration): Ensure the paginated update is deterministic #21778
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,22 +100,31 @@ def paginated_update( | |
""" | ||
Update models in small batches so we don't have to load everything in memory. | ||
""" | ||
start = 0 | ||
count = query.count() | ||
|
||
total = query.count() | ||
processed = 0 | ||
session: Session = inspect(query).session | ||
result = session.execute(query) | ||
|
||
if print_page_progress is None or print_page_progress is True: | ||
print_page_progress = lambda current, total: print( | ||
f" {current}/{total}", end="\r" | ||
print_page_progress = lambda processed, total: print( | ||
f" {processed}/{total}", end="\r" | ||
) | ||
while start < count: | ||
end = min(start + batch_size, count) | ||
for obj in query[start:end]: | ||
yield obj | ||
session.merge(obj) | ||
|
||
while True: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The option was to use either
|
||
rows = result.fetchmany(batch_size) | ||
|
||
if not rows: | ||
break | ||
|
||
for row in rows: | ||
yield row[0] | ||
|
||
session.commit() | ||
processed += len(rows) | ||
|
||
if print_page_progress: | ||
print_page_progress(end, count) | ||
start += batch_size | ||
print_page_progress(processed, total) | ||
|
||
|
||
def try_load_json(data: Optional[str]) -> Dict[str, Any]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,6 @@ def upgrade(): | |
state["anchor"] = state["hash"] | ||
del state["hash"] | ||
entry.value = pickle.dumps(value) | ||
session.commit() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to commit as the |
||
|
||
|
||
def downgrade(): | ||
|
@@ -87,5 +86,3 @@ def downgrade(): | |
state["hash"] = state["anchor"] | ||
del state["anchor"] | ||
entry.value = pickle.dumps(value) | ||
session.merge(entry) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to merge the existing entry. See here for details:
|
||
session.commit() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to commit as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to merge the record. The caller should handle this if required.