diff --git a/elementary/operations/upload_source_freshness.py b/elementary/operations/upload_source_freshness.py index df80deaf4..387ed4155 100644 --- a/elementary/operations/upload_source_freshness.py +++ b/elementary/operations/upload_source_freshness.py @@ -1,5 +1,7 @@ import json import os +from itertools import accumulate, groupby +from operator import itemgetter from pathlib import Path import click @@ -10,6 +12,8 @@ from elementary.monitor import dbt_project_utils from elementary.utils.ordered_yaml import OrderedYaml +MAX_SERIALISED_CHARACTER_LENGTH = 90_000 + class UploadSourceFreshnessOperation: def __init__(self, config: Config): @@ -58,20 +62,32 @@ def upload_results(self, results: dict, metadata: dict): f"Source freshness for invocation id {invocation_id} were already uploaded." ) - chunk_size = 100 - chunk_list = list(range(0, len(results), chunk_size)) + for result in results: + result["metadata"] = metadata + + serialised_argument_lengths = [len(json.dumps(record)) for record in results] + + argument_splits = [ + (running_total // MAX_SERIALISED_CHARACTER_LENGTH, record) + for running_total, record in zip( + accumulate(serialised_argument_lengths), + results, + ) + ] + + chunked_commands = { + chunk_id: [i[1] for i in chunk_tuple] + for chunk_id, chunk_tuple in groupby(argument_splits, key=itemgetter(0)) + } + upload_with_progress_bar = alive_it( - chunk_list, title="Uploading source freshness results" + chunked_commands.values(), title="Uploading source freshness results" ) - for chunk in upload_with_progress_bar: - results_segment = results[chunk : chunk + chunk_size] - - for result in results_segment: - result["metadata"] = metadata + for result_chunk in upload_with_progress_bar: dbt_runner.run_operation( "elementary_cli.upload_source_freshness", - macro_args={"results": json.dumps(results_segment)}, + macro_args={"results": json.dumps(result_chunk)}, quiet=True, )