Skip to content

Commit

Permalink
perf: Optimize dataframe-series alignment on axis=1 (#732)
Browse files Browse the repository at this point in the history
* perf: Use transpose cache to align series on axis=1

* refactor df binary op alignment

* cleanup assertion, comments
  • Loading branch information
TrevorBergeron authored May 31, 2024
1 parent 25d049c commit 3d39221
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 167 deletions.
240 changes: 227 additions & 13 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ def __init__(
self._transpose_cache: Optional[Block] = transpose_cache

@classmethod
def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
def from_local(
cls,
data: pd.DataFrame,
session: bigframes.Session,
*,
cache_transpose: bool = True,
) -> Block:
# Assumes caller has already converted datatypes to bigframes ones.
pd_data = data
column_labels = pd_data.columns
Expand All @@ -169,12 +175,21 @@ def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
pd_data = pd_data.reset_index(names=index_ids)
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
return cls(
block = cls(
array_value,
column_labels=column_labels,
index_columns=index_ids,
index_labels=index_labels,
)
if cache_transpose:
try:
# this cache will help when aligning on axis=1
block = block.with_transpose_cache(
cls.from_local(data.T, session, cache_transpose=False)
)
except Exception:
pass
return block

@property
def index(self) -> BlockIndexProperties:
Expand Down Expand Up @@ -724,12 +739,18 @@ def with_column_labels(
f"The column labels size `{len(label_list)} ` should equal to the value"
+ f"columns size: {len(self.value_columns)}."
)
return Block(
block = Block(
self._expr,
index_columns=self.index_columns,
column_labels=label_list,
index_labels=self.index.names,
)
singleton_label = len(list(value)) == 1 and list(value)[0]
if singleton_label is not None and self._transpose_cache is not None:
new_cache, label_id = self._transpose_cache.create_constant(singleton_label)
new_cache = new_cache.set_index([label_id])
block = block.with_transpose_cache(new_cache)
return block

def with_transpose_cache(self, transposed: Block):
return Block(
Expand Down Expand Up @@ -1947,6 +1968,153 @@ def merge(

return Block(expr, index_columns=index_columns, column_labels=labels)

def _align_both_axes(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
# Join rows
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
# join columns schema
# indexers will be none for exact match
if self.column_labels.equals(other.column_labels):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.column_labels, how="outer", return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.free_var(get_column_right[other.value_columns[index]])
if index != -1
else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))

def _align_axis_0(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
assert len(other.value_columns) == 1
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)

series_column_id = other.value_columns[0]
inputs = tuple(
(
ex.free_var(get_column_left[col]),
ex.free_var(get_column_right[series_column_id]),
)
for col in self.value_columns
)
return aligned_block, self.column_labels, inputs

def _align_series_block_axis_1(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
assert len(other.value_columns) == 1
if other._transpose_cache is None:
raise ValueError(
"Wrong align method, this approach requires transpose cache"
)

# Join rows
aligned_block, (get_column_left, get_column_right) = join_with_single_row(
self, other.transpose()
)
# join columns schema
# indexers will be none for exact match
if self.column_labels.equals(other.transpose().column_labels):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.transpose().column_labels, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.free_var(
get_column_right[other.transpose().value_columns[index]]
)
if index != -1
else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))

def _align_pd_series_axis_1(
self, other: pd.Series, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
if self.column_labels.equals(other.index):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
if not (self.column_labels.is_unique and other.index.is_unique):
raise ValueError("Cannot align non-unique indices")
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.index, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
)
rcol_indexer = (
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
)

left_input_lookup = (
lambda index: ex.free_var(self.value_columns[index])
if index != -1
else ex.const(None)
)
righ_input_lookup = (
lambda index: ex.const(other.iloc[index]) if index != -1 else ex.const(None)
)

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return self, columns, tuple(zip(left_inputs, right_inputs))

def _apply_binop(
self,
op: ops.BinaryOp,
inputs: Sequence[Tuple[ex.Expression, ex.Expression]],
labels: pd.Index,
reverse: bool = False,
) -> Block:
block = self
binop_result_ids = []
for left_input, right_input in inputs:
expr = (
op.as_expr(right_input, left_input)
if reverse
else op.as_expr(left_input, right_input)
)
block, result_col_id = block.project_expr(expr)
binop_result_ids.append(result_col_id)

return block.select_columns(binop_result_ids).with_column_labels(labels)

def join(
self,
other: Block,
Expand Down Expand Up @@ -2268,15 +2436,6 @@ def column_ids(self) -> Sequence[str]:
"""Column(s) to use as row labels."""
return self._block._index_columns

def __repr__(self) -> str:
"""Converts an Index to a string."""
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole index by using job
# metadata, like we do with DataFrame.
preview = self.to_pandas()
return repr(preview)

def to_pandas(self) -> pd.Index:
"""Executes deferred operations and downloads the results."""
if len(self.column_ids) == 0:
Expand Down Expand Up @@ -2371,6 +2530,61 @@ def join_indexless(
)


def join_with_single_row(
left: Block,
single_row_block: Block,
) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]:
"""
Special join case where other is a single row block.
This property is not validated, caller responsible for not passing multi-row block.
Preserves index of the left block, ignoring label of other.
"""
left_expr = left.expr
# ignore index columns by dropping them
right_expr = single_row_block.expr.select_columns(single_row_block.value_columns)
left_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.LEFT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in left_expr.column_ids
]
right_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.RIGHT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in right_expr.column_ids # skip index column
]

join_def = join_defs.JoinDefinition(
conditions=(),
mappings=(*left_mappings, *right_mappings),
type="cross",
)
combined_expr = left_expr.join(
right_expr,
join_def=join_def,
)
get_column_left = join_def.get_left_mapping()
get_column_right = join_def.get_right_mapping()
# Drop original indices from each side. and used the coalesced combination generated by the join.
index_cols_post_join = [get_column_left[id] for id in left.index_columns]

block = Block(
combined_expr,
index_columns=index_cols_post_join,
column_labels=left.column_labels.append(single_row_block.column_labels),
index_labels=[left.index.name],
)
return (
block,
(get_column_left, get_column_right),
)


def join_mono_indexed(
left: Block,
right: Block,
Expand Down Expand Up @@ -2558,7 +2772,7 @@ def coalesce_columns(
) -> Tuple[core.ArrayValue, Sequence[str]]:
result_ids = []
for left_id, right_id in zip(left_ids, right_ids):
if how == "left" or how == "inner":
if how == "left" or how == "inner" or how == "cross":
result_ids.append(left_id)
expr = expr.drop_columns([right_id])
elif how == "right":
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def size(self) -> series.Series:
by_column_ids=self._by_col_ids,
dropna=self._dropna,
)
return series.Series(agg_block, name=self._value_name)
return series.Series(agg_block.with_column_labels([self._value_name]))

def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
Expand Down
Loading

0 comments on commit 3d39221

Please sign in to comment.