Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk calls to DBT based on string length #1354

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 25 additions & 9 deletions elementary/operations/upload_source_freshness.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import os
from itertools import accumulate, groupby
from operator import itemgetter
from pathlib import Path

import click
Expand All @@ -10,6 +12,8 @@
from elementary.monitor import dbt_project_utils
from elementary.utils.ordered_yaml import OrderedYaml

MAX_SERIALISED_CHARACTER_LENGTH = 90_000


class UploadSourceFreshnessOperation:
def __init__(self, config: Config):
Expand Down Expand Up @@ -58,20 +62,32 @@ def upload_results(self, results: dict, metadata: dict):
f"Source freshness for invocation id {invocation_id} were already uploaded."
)

chunk_size = 100
chunk_list = list(range(0, len(results), chunk_size))
for result in results:
result["metadata"] = metadata

serialised_argument_lengths = [len(json.dumps(record)) for record in results]

argument_splits = [
(running_total // MAX_SERIALISED_CHARACTER_LENGTH, record)
for running_total, record in zip(
accumulate(serialised_argument_lengths),
results,
)
]

chunked_commands = {
chunk_id: [i[1] for i in chunk_tuple]
for chunk_id, chunk_tuple in groupby(argument_splits, key=itemgetter(0))
}

upload_with_progress_bar = alive_it(
chunk_list, title="Uploading source freshness results"
chunked_commands.values(), title="Uploading source freshness results"
)
for chunk in upload_with_progress_bar:
results_segment = results[chunk : chunk + chunk_size]

for result in results_segment:
result["metadata"] = metadata

for result_chunk in upload_with_progress_bar:
dbt_runner.run_operation(
"elementary_cli.upload_source_freshness",
macro_args={"results": json.dumps(results_segment)},
macro_args={"results": json.dumps(result_chunk)},
quiet=True,
)

Expand Down