Skip to content

Commit

Permalink
Fix multipartitions run length encoding error (#12329)
Browse files Browse the repository at this point in the history
Seen here:
https://app.datadoghq.com/apm/trace/3303427780901995569?spanID=13139862780265732778&spanViewType=errors

This PR is mostly just snapshot updates and is hopefully fairly fast to
review!

The root cause is that the first dimension has no partitions. This can
happen on an empty static partitions definition, or a time window
partitions definition where the start time is in the future.

This PR adds a check for this case, exiting early if the primary
dimension has no partition keys. The run length encoding will still
succeed if the secondary dimension has no partition keys.
  • Loading branch information
clairelin135 committed Feb 15, 2023
1 parent 370093d commit 8a42a96
Show file tree
Hide file tree
Showing 6 changed files with 18,091 additions and 14,120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,17 @@ def get_2d_run_length_encoded_materialized_partitions(
unevaluated_idx = 0
range_start_idx = 0 # pointer to first dim1 partition with same dim2 materialization status

if len(dim1_keys) == 0 or len(secondary_dim.partitions_def.get_partition_keys()) == 0:
return GrapheneMultiPartitions(ranges=[], primaryDimensionName=primary_dim.name)

while unevaluated_idx <= len(dim1_keys):
if (
unevaluated_idx == len(dim1_keys)
or dim2_partition_subset_by_dim1[dim1_keys[unevaluated_idx]]
!= dim2_partition_subset_by_dim1[dim1_keys[range_start_idx]]
):
# Add new multipartition range if we've reached the end of the dim1 keys or if the
# second dimension subset is different than the previous dim1 key
if len(dim2_partition_subset_by_dim1[dim1_keys[range_start_idx]]) > 0:
# Do not add to materialized_2d_ranges if the dim2 partition subset is empty
start_key = dim1_keys[range_start_idx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,19 @@ def multipartitions_2(multipartitions_1):
return multipartitions_1


no_partitions_multipartitions_def = MultiPartitionsDefinition(
{
"a": StaticPartitionsDefinition([]),
"b": StaticPartitionsDefinition([]),
}
)


@asset(partitions_def=no_partitions_multipartitions_def)
def no_multipartitions_1():
return 1


# For now the only way to add assets to repositories is via AssetGroup
# When AssetGroup is removed, these assets should be added directly to repository_with_named_groups
named_groups_job = AssetGroup(
Expand Down Expand Up @@ -1956,6 +1969,12 @@ def define_asset_jobs():
AssetSelection.assets(multipartitions_1, multipartitions_2),
partitions_def=multipartitions_def,
),
no_multipartitions_1,
define_asset_job(
"no_multipartitions_job",
AssetSelection.assets(no_multipartitions_1),
partitions_def=no_partitions_multipartitions_def,
),
SourceAsset("diamond_source"),
fresh_diamond_top,
fresh_diamond_left,
Expand Down

0 comments on commit 8a42a96

Please sign in to comment.