Skip to content

Commit

Permalink
Improve arrow_to_dataframe for large amounts of columns (#73)
Browse files Browse the repository at this point in the history
Co-authored-by: Justin Gilmer <16173348+justinGilmer@users.noreply.github.com>
  • Loading branch information
jleifnf and justinGilmer committed Feb 7, 2024
1 parent 7e69f1d commit bdd7b68
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions btrdb/transformers.py
Expand Up @@ -181,6 +181,19 @@ def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame:
-----
This method is available for commercial customers with arrow-enabled servers.
"""

def _rename(col_name, col_names_map):
if col_name == "time":
return col_name
col_name_parts = col_name.split("/")
if len(col_name_parts) == 1:
uuid = col_name_parts[0]
new_col_name = col_names_map[uuid]
elif len(col_name_parts) == 2:
uuid, _ = col_name_parts
new_col_name = col_name.replace(uuid, col_names_map[uuid])
return new_col_name

if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
"arrow_to_dataframe requires an arrow-enabled BTrDB server."
Expand All @@ -206,35 +219,30 @@ def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame:
if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name
# format is: uuid/stat_type
tmp_table = streamset.arrow_values()
tmp = streamset.arrow_values()
# assume time col is the first column
time_col = tmp.column_names[0]

tmp_df = tmp.to_pandas(
date_as_object=False,
types_mapper=pd.ArrowDtype,
split_blocks=True,
self_destruct=True,
).set_index(time_col)
tmp_df.index.name = time_col
col_names = _stream_names(streamset, name_callable)
col_names_map = {str(s.uuid): c for s, c in zip(streamset, col_names)}
updated_table_columns = []
# assume time col is the first column
time_col = tmp_table.column_names[0]
for old_col in tmp_table.column_names:
if old_col == time_col:
updated_table_columns.append(time_col)
else:
for uu, new_name in col_names_map.items():
if uu in old_col:
updated_table_columns.append(old_col.replace(uu, new_name))
else:
continue
tmp_table = tmp_table.rename_columns(updated_table_columns)
tmp_df.rename(
columns=lambda col_name: _rename(col_name, col_names_map), inplace=True
)
if not streamset.allow_window:
usable_cols = []
for column_str in tmp_table.column_names:
for column_str in tmp_df.columns:
for agg_name in agg:
if agg_name in column_str:
usable_cols.append(column_str)
tmp = tmp_table.select([time_col, *usable_cols])
else:
tmp = tmp_table
tmp_df = tmp.to_pandas(date_as_object=False, types_mapper=pd.ArrowDtype).set_index(
time_col
)
tmp_df.index.name = time_col
tmp_df = tmp_df.loc[:, usable_cols]

return tmp_df


Expand Down

0 comments on commit bdd7b68

Please sign in to comment.