From bdd7b688292c1f2feae8f4b98c0984588fc73621 Mon Sep 17 00:00:00 2001 From: Jeff Lin <42981468+jleifnf@users.noreply.github.com> Date: Wed, 7 Feb 2024 14:39:33 -0800 Subject: [PATCH] Improve arrow_to_dataframe for large amounts of columns (#73) Co-authored-by: Justin Gilmer <16173348+justinGilmer@users.noreply.github.com> --- btrdb/transformers.py | 52 +++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/btrdb/transformers.py b/btrdb/transformers.py index 662dee0..0ff01fb 100644 --- a/btrdb/transformers.py +++ b/btrdb/transformers.py @@ -181,6 +181,19 @@ def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame: ----- This method is available for commercial customers with arrow-enabled servers. """ + + def _rename(col_name, col_names_map): + if col_name == "time": + return col_name + col_name_parts = col_name.split("/") + if len(col_name_parts) == 1: + uuid = col_name_parts[0] + new_col_name = col_names_map[uuid] + elif len(col_name_parts) == 2: + uuid, _ = col_name_parts + new_col_name = col_name.replace(uuid, col_names_map[uuid]) + return new_col_name + if not streamset._btrdb._ARROW_ENABLED: raise NotImplementedError( "arrow_to_dataframe requires an arrow-enabled BTrDB server." @@ -206,35 +219,30 @@ def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame: if not callable(name_callable): name_callable = lambda s: s.collection + "/" + s.name # format is: uuid/stat_type - tmp_table = streamset.arrow_values() + tmp = streamset.arrow_values() + # assume time col is the first column + time_col = tmp.column_names[0] + + tmp_df = tmp.to_pandas( + date_as_object=False, + types_mapper=pd.ArrowDtype, + split_blocks=True, + self_destruct=True, + ).set_index(time_col) + tmp_df.index.name = time_col col_names = _stream_names(streamset, name_callable) col_names_map = {str(s.uuid): c for s, c in zip(streamset, col_names)} - updated_table_columns = [] - # assume time col is the first column - time_col = tmp_table.column_names[0] - for old_col in tmp_table.column_names: - if old_col == time_col: - updated_table_columns.append(time_col) - else: - for uu, new_name in col_names_map.items(): - if uu in old_col: - updated_table_columns.append(old_col.replace(uu, new_name)) - else: - continue - tmp_table = tmp_table.rename_columns(updated_table_columns) + tmp_df.rename( + columns=lambda col_name: _rename(col_name, col_names_map), inplace=True + ) if not streamset.allow_window: usable_cols = [] - for column_str in tmp_table.column_names: + for column_str in tmp_df.columns: for agg_name in agg: if agg_name in column_str: usable_cols.append(column_str) - tmp = tmp_table.select([time_col, *usable_cols]) - else: - tmp = tmp_table - tmp_df = tmp.to_pandas(date_as_object=False, types_mapper=pd.ArrowDtype).set_index( - time_col - ) - tmp_df.index.name = time_col + tmp_df = tmp_df.loc[:, usable_cols] + return tmp_df