Skip to content

Commit

Permalink
fix(migration): Ensure the paginated update is deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
john-bodley committed Oct 12, 2022
1 parent bd3166b commit f8f44a5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
31 changes: 20 additions & 11 deletions superset/migrations/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,31 @@ def paginated_update(
"""
Update models in small batches so we don't have to load everything in memory.
"""
start = 0
count = query.count()

total = query.count()
processed = 0
session: Session = inspect(query).session
result = session.execute(query)

if print_page_progress is None or print_page_progress is True:
print_page_progress = lambda current, total: print(
f" {current}/{total}", end="\r"
print_page_progress = lambda processed, total: print(
f" {processed}/{total}", end="\r"
)
while start < count:
end = min(start + batch_size, count)
for obj in query[start:end]:
yield obj
session.merge(obj)

while True:
rows = result.fetchmany(batch_size)

if not rows:
break

for row in rows:
yield row[0]

session.commit()
processed += len(rows)

if print_page_progress:
print_page_progress(end, count)
start += batch_size
print_page_progress(processed, total)


def try_load_json(data: Optional[str]) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def upgrade():
state["anchor"] = state["hash"]
del state["hash"]
entry.value = pickle.dumps(value)
session.commit()


def downgrade():
Expand All @@ -87,5 +86,3 @@ def downgrade():
state["hash"] = state["anchor"]
del state["anchor"]
entry.value = pickle.dumps(value)
session.merge(entry)
session.commit()

0 comments on commit f8f44a5

Please sign in to comment.