Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] enable metadata preservation across materialization points #2216

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]:
if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
yield from self._result.values()
for mat_result in self._result.values():
yield mat_result.partition()

else:
# Execute the dataframe in a streaming fashion.
Expand All @@ -238,8 +239,9 @@ def _populate_preview(self) -> None:
)
if preview_partition_invalid:
preview_parts = self._result._get_preview_vpartition(self._num_preview_rows)
preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})

preview_results = LocalPartitionSet()
for i, part in enumerate(preview_parts):
preview_results.set_partition_from_table(i, part)
preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
Expand Down Expand Up @@ -314,7 +316,10 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame":
if not parts:
raise ValueError("Can't create a DataFrame from an empty list of tables.")

result_pset = LocalPartitionSet({i: part for i, part in enumerate(parts)})
result_pset = LocalPartitionSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a LocalPartitionSet.from_tables()?


for i, part in enumerate(parts):
result_pset.set_partition_from_table(i, part)

context = get_context()
cache_entry = context.runner().put_partition_set_into_cache(result_pset)
Expand Down
11 changes: 3 additions & 8 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from daft.logical.schema import Schema
from daft.runners.partitioning import (
MaterializedResult,
PartialPartitionMetadata,
PartitionT,
)
from daft.table.micropartition import MicroPartition
Expand Down Expand Up @@ -68,16 +67,12 @@ def _stage_id_counter():


def partition_read(
partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None
materialized_results: Iterator[MaterializedResult[PartitionT]],
) -> InProgressPhysicalPlan[PartitionT]:
"""Instantiate a (no-op) physical plan from existing partitions."""
if metadatas is None:
# Iterator of empty metadatas.
metadatas = (PartialPartitionMetadata(num_rows=None, size_bytes=None) for _ in iter(int, 1))

yield from (
PartitionTaskBuilder[PartitionT](inputs=[partition], partial_metadatas=[metadata])
for partition, metadata in zip(partitions, metadatas)
PartitionTaskBuilder[PartitionT](inputs=[mat_result.partition()], partial_metadatas=[mat_result.metadata()])
for mat_result in materialized_results
)


Expand Down
3 changes: 2 additions & 1 deletion daft/io/file_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def from_glob_path(path: str, io_config: Optional[IOConfig] = None) -> DataFrame
runner_io = context.runner().runner_io()
file_infos = runner_io.glob_paths_details([path], io_config=io_config)
file_infos_table = MicroPartition._from_pytable(file_infos.to_table())
partition = LocalPartitionSet({0: file_infos_table})
partition = LocalPartitionSet()
partition.set_partition_from_table(0, file_infos_table)
cache_entry = context.runner().put_partition_set_into_cache(partition)
size_bytes = partition.size_bytes()
assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
Expand Down
6 changes: 3 additions & 3 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,18 @@ def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
merged_partition = self._get_merged_vpartition()
return merged_partition.to_arrow(cast_tensors_to_ray_tensor_dtype)

def items(self) -> list[tuple[PartID, PartitionT]]:
def items(self) -> list[tuple[PartID, MaterializedResult[PartitionT]]]:
"""
Returns all (partition id, partition) in this PartitionSet,
ordered by partition ID.
"""
raise NotImplementedError()

def values(self) -> list[PartitionT]:
def values(self) -> list[MaterializedResult[PartitionT]]:
return [value for _, value in self.items()]

@abstractmethod
def get_partition(self, idx: PartID) -> PartitionT:
def get_partition(self, idx: PartID) -> MaterializedResult[PartitionT]:
raise NotImplementedError()

@abstractmethod
Expand Down
67 changes: 45 additions & 22 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from daft.runners import runner_io
from daft.runners.partitioning import (
MaterializedResult,
PartialPartitionMetadata,
PartID,
PartitionCacheEntry,
PartitionMetadata,
Expand All @@ -28,23 +29,27 @@
logger = logging.getLogger(__name__)


@dataclass
class LocalPartitionSet(PartitionSet[MicroPartition]):
_partitions: dict[PartID, MicroPartition]
_partitions: dict[PartID, MaterializedResult[MicroPartition]]

def __init__(self) -> None:
super().__init__()
self._partitions = {}

def items(self) -> list[tuple[PartID, MicroPartition]]:
def items(self) -> list[tuple[PartID, MaterializedResult[MicroPartition]]]:
return sorted(self._partitions.items())

def _get_merged_vpartition(self) -> MicroPartition:
ids_and_partitions = self.items()
assert ids_and_partitions[0][0] == 0
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
return MicroPartition.concat([part for id, part in ids_and_partitions])
return MicroPartition.concat([part.partition() for id, part in ids_and_partitions])

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
for _, mat_result in ids_and_partitions:
part: MicroPartition = mat_result.partition()
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
Expand All @@ -54,11 +59,14 @@ def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
preview_parts.append(part)
return preview_parts

def get_partition(self, idx: PartID) -> MicroPartition:
def get_partition(self, idx: PartID) -> MaterializedResult[MicroPartition]:
return self._partitions[idx]

def set_partition(self, idx: PartID, part: MaterializedResult[MicroPartition]) -> None:
self._partitions[idx] = part.partition()
self._partitions[idx] = part

def set_partition_from_table(self, idx: PartID, part: MicroPartition) -> None:
self._partitions[idx] = PyMaterializedResult(part, PartitionMetadata.from_table(part))

def delete_partition(self, idx: PartID) -> None:
del self._partitions[idx]
Expand All @@ -67,10 +75,10 @@ def has_partition(self, idx: PartID) -> bool:
return idx in self._partitions

def __len__(self) -> int:
return sum(len(partition) for partition in self._partitions.values())
return sum(len(partition.partition()) for partition in self._partitions.values())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. After this PR we actually have metadata, and don't necessarily need to reach for the partition to get the length...

Would it not be possible/safe to let MaterializedResult.__len__ delegate appropriately between the metadata and the partition to get the length of the partition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't really matter given that this is a local MicroPartition though


def size_bytes(self) -> int | None:
size_bytes_ = [partition.size_bytes() for partition in self._partitions.values()]
size_bytes_ = [partition.partition().size_bytes() for partition in self._partitions.values()]
size_bytes: list[int] = [size for size in size_bytes_ if size is not None]
if len(size_bytes) != len(size_bytes_):
return None
Expand Down Expand Up @@ -126,7 +134,7 @@ def runner_io(self) -> PyRunnerIO:
def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
results = list(self.run_iter(builder))

result_pset = LocalPartitionSet({})
result_pset = LocalPartitionSet()
for i, result in enumerate(results):
result_pset.set_partition(i, result)

Expand All @@ -144,6 +152,7 @@ def run_iter(

# Optimize the logical plan.
builder = builder.optimize()

# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
Expand Down Expand Up @@ -209,8 +218,10 @@ def _physical_plan_to_partitions(
)
):
logger.debug("Running task synchronously in main thread: %s", next_step)
partitions = self.build_partitions(next_step.instructions, *next_step.inputs)
next_step.set_result([PyMaterializedResult(partition) for partition in partitions])
materialized_results = self.build_partitions(
next_step.instructions, next_step.inputs, next_step.partial_metadatas
)
next_step.set_result(materialized_results)

else:
# Submit the task for execution.
Expand All @@ -220,7 +231,10 @@ def _physical_plan_to_partitions(
pbar.mark_task_start(next_step)

future = thread_pool.submit(
self.build_partitions, next_step.instructions, *next_step.inputs
self.build_partitions,
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
)
# Register the inflight task and resources used.
future_to_task[future] = next_step.id()
Expand All @@ -239,12 +253,13 @@ def _physical_plan_to_partitions(
done_id = future_to_task.pop(done_future)
del inflight_tasks_resources[done_id]
done_task = inflight_tasks.pop(done_id)
partitions = done_future.result()
materialized_results = done_future.result()

pbar.mark_task_done(done_task)

logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions))
done_task.set_result([PyMaterializedResult(partition) for partition in partitions])
logger.debug("Task completed: %s -> <%s partitions>", done_id, len(materialized_results))

done_task.set_result(materialized_results)

if next_step is None:
next_step = next(plan)
Expand Down Expand Up @@ -278,17 +293,23 @@ def _can_admit_task(self, resource_request: ResourceRequest, inflight_resources:
return all((cpus_okay, gpus_okay, memory_okay))

@staticmethod
def build_partitions(instruction_stack: list[Instruction], *inputs: MicroPartition) -> list[MicroPartition]:
partitions = list(inputs)
def build_partitions(
instruction_stack: list[Instruction],
partitions: list[MicroPartition],
final_metadata: list[PartialPartitionMetadata],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we could enforce same length using partitions: list[tuple[MicroPartition, PartialPartitionMetdata]]

) -> list[MaterializedResult[MicroPartition]]:
for instruction in instruction_stack:
partitions = instruction.run(partitions)
return [
PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial))
for part, partial in zip(partitions, final_metadata)
]

return partitions


@dataclass(frozen=True)
@dataclass
class PyMaterializedResult(MaterializedResult[MicroPartition]):
_partition: MicroPartition
_metadata: PartitionMetadata | None = None

def partition(self) -> MicroPartition:
return self._partition
Expand All @@ -297,7 +318,9 @@ def vpartition(self) -> MicroPartition:
return self._partition

def metadata(self) -> PartitionMetadata:
return PartitionMetadata.from_table(self._partition)
if self._metadata is None:
self._metadata = PartitionMetadata.from_table(self._partition)
return self._metadata

def cancel(self) -> None:
return None
Expand Down
Loading
Loading