Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch fetching of data version records #21798

Merged
merged 6 commits into from
May 13, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import TYPE_CHECKING

"""This module contains the execution context objects that are internal to the system.
Expand Down Expand Up @@ -35,8 +36,6 @@
if TYPE_CHECKING:
from dagster._core.execution.context.compute import StepExecutionContext

ASSET_RECORD_BATCH_SIZE = 100


@dataclass
class InputAssetVersionInfo:
Expand Down Expand Up @@ -148,14 +147,13 @@ def _fetch_input_asset_version_info(self, asset_keys: Sequence[AssetKey]) -> Non
)

def _fetch_asset_records(self, asset_keys: Sequence[AssetKey]) -> Dict[AssetKey, "AssetRecord"]:
batch_size = int(os.getenv("ASSET_RECORD_BATCH_SIZE", "100"))
prha marked this conversation as resolved.
Show resolved Hide resolved
asset_records_by_key = {}
to_fetch = asset_keys
while len(to_fetch):
for record in self._context.instance.get_asset_records(
to_fetch[:ASSET_RECORD_BATCH_SIZE]
):
for record in self._context.instance.get_asset_records(to_fetch[:batch_size]):
asset_records_by_key[record.asset_entry.asset_key] = record
to_fetch = to_fetch[ASSET_RECORD_BATCH_SIZE:]
to_fetch = to_fetch[batch_size:]

return asset_records_by_key

Expand Down