From 8d9aa551658c124c630a329c1149de943c7b072b Mon Sep 17 00:00:00 2001 From: looselycoupled Date: Wed, 13 May 2020 15:27:52 -0400 Subject: [PATCH 1/3] adds testing --- btrdb/transformers.py | 124 ++++++++++++++++++++++++----- tests/btrdb/test_transformers.py | 131 ++++++++++++++++++++++++++++++- 2 files changed, 235 insertions(+), 20 deletions(-) diff --git a/btrdb/transformers.py b/btrdb/transformers.py index c6c3e35..35de5f1 100644 --- a/btrdb/transformers.py +++ b/btrdb/transformers.py @@ -18,21 +18,28 @@ import csv import contextlib from collections import OrderedDict +from warnings import warn ########################################################################## ## Helper Functions ########################################################################## +_STAT_PROPERTIES = ('min', 'mean', 'max', 'count', 'stddev') + def _get_time_from_row(row): for item in row: if item: return item.time raise Exception("Row contains no data") -def _stream_names(streamset): +def _stream_names(streamset, func): + """ + private convenience function to come up with proper final stream names + before sending a collection of streams (dataframe, etc.) back to the + user. + """ return tuple( - s.collection + "/" + s.name \ - for s in streamset._streams + func(s) for s in streamset._streams ) @@ -40,7 +47,7 @@ def _stream_names(streamset): ## Transform Functions ########################################################################## -def to_series(streamset, datetime64_index=True, agg="mean"): +def to_series(streamset, datetime64_index=True, agg="mean", name_callable=None): """ Returns a list of Pandas Series objects indexed by time @@ -55,14 +62,26 @@ def to_series(streamset, datetime64_index=True, agg="mean"): from. Must be one of "min", "mean", "max", "count", or "stddev". This argument is ignored if RawPoint values are passed into the function. + name_callable : lambda, default: lambda s: s.collection + "/" + s.name + Sprecify a callable that can be used to determine the series name given a + Stream object. + """ try: import pandas as pd except ImportError: raise ImportError("Please install Pandas to use this transformation function.") + # TODO: allow this at some future point + if agg == "all": + raise AttributeError("cannot use 'all' as aggregate at this time") + + if not callable(name_callable): + name_callable = lambda s: s.collection + "/" + s.name + + result = [] - stream_names = _stream_names(streamset) + stream_names = _stream_names(streamset, name_callable) for idx, output in enumerate(streamset.values()): times, values = [], [] @@ -82,7 +101,7 @@ def to_series(streamset, datetime64_index=True, agg="mean"): return result -def to_dataframe(streamset, columns=None, agg="mean"): +def to_dataframe(streamset, columns=None, agg="mean", name_callable=None): """ Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column. @@ -90,12 +109,17 @@ def to_dataframe(streamset, columns=None, agg="mean"): Parameters ---------- columns: sequence - column names to use for DataFrame + column names to use for DataFrame. Deprecated and not compatible with name_callable. agg : str, default: "mean" Specify the StatPoint field (e.g. aggregating function) to create the Series - from. Must be one of "min", "mean", "max", "count", or "stddev". This - argument is ignored if RawPoint values are passed into the function. + from. Must be one of "min", "mean", "max", "count", "stddev", or "all". This + argument is ignored if not using StatPoints. + + name_callable : lambda, default: lambda s: s.collection + "/" + s.name + Sprecify a callable that can be used to determine the series name given a + Stream object. This is not compatible with agg == "all" at this time + """ try: @@ -103,9 +127,33 @@ def to_dataframe(streamset, columns=None, agg="mean"): except ImportError: raise ImportError("Please install Pandas to use this transformation function.") - stream_names = _stream_names(streamset) - columns = columns if columns else ["time"] + list(stream_names) - return pd.DataFrame(to_dict(streamset,agg=agg), columns=columns).set_index("time") + # deprecation warning added in v5.8 + if columns: + warn("the columns argument is deprecated and will be removed in a future release", DeprecationWarning, stacklevel=2) + + # TODO: allow this at some future point + if agg == "all" and name_callable is not None: + raise AttributeError("cannot provide name_callable when using 'all' as aggregate at this time") + + # do not allow agg="all" with RawPoints + if agg == "all" and streamset.allow_window: + agg="" + + # default arg values + if not callable(name_callable): + name_callable = lambda s: s.collection + "/" + s.name + + + df = pd.DataFrame(to_dict(streamset,agg=agg)) + df = df.set_index("time") + + if agg == "all" and not streamset.allow_window: + stream_names = [[s.collection, s.name, prop] for s in streamset._streams for prop in _STAT_PROPERTIES] + df.columns=pd.MultiIndex.from_tuples(stream_names) + else: + df.columns = _stream_names(streamset, name_callable) + + return df def to_array(streamset, agg="mean"): @@ -126,6 +174,10 @@ def to_array(streamset, agg="mean"): except ImportError: raise ImportError("Please install Numpy to use this transformation function.") + # TODO: allow this at some future point + if agg == "all": + raise AttributeError("cannot use 'all' as aggregate at this time") + results = [] for points in streamset.values(): segment = [] @@ -138,7 +190,7 @@ def to_array(streamset, agg="mean"): return np.array(results) -def to_dict(streamset, agg="mean"): +def to_dict(streamset, agg="mean", name_callable=None): """ Returns a list of OrderedDict for each time code with the appropriate stream data attached. @@ -150,9 +202,17 @@ def to_dict(streamset, agg="mean"): keys. Must be one of "min", "mean", "max", "count", or "stddev". This argument is ignored if RawPoint values are passed into the function. + name_callable : lambda, default: lambda s: s.collection + "/" + s.name + Sprecify a callable that can be used to determine the series name given a + Stream object. + """ + if not callable(name_callable): + name_callable = lambda s: s.collection + "/" + s.name + data = [] - stream_names = _stream_names(streamset) + stream_names = _stream_names(streamset, name_callable) + for row in streamset.rows(): item = OrderedDict({ "time": _get_time_from_row(row), @@ -161,12 +221,16 @@ def to_dict(streamset, agg="mean"): if row[idx].__class__.__name__ == "RawPoint": item[col] = row[idx].value if row[idx] else None else: - item[col] = getattr(row[idx], agg) if row[idx] else None + if agg == "all": + for stat in _STAT_PROPERTIES: + item["{}-{}".format(col, stat)] = getattr(row[idx], stat) if row[idx] else None + else: + item[col] = getattr(row[idx], agg) if row[idx] else None data.append(item) return data -def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"): +def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean", name_callable=None): """ Saves stream data as a CSV file. @@ -187,8 +251,19 @@ def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"): Specify the StatPoint field (e.g. aggregating function) to return when limiting results. Must be one of "min", "mean", "max", "count", or "stddev". This argument is ignored if RawPoint values are passed into the function. + + name_callable : lambda, default: lambda s: s.collection + "/" + s.name + Sprecify a callable that can be used to determine the series name given a + Stream object. """ + # TODO: allow this at some future point + if agg == "all": + raise AttributeError("cannot use 'all' as aggregate at this time") + + if not callable(name_callable): + name_callable = lambda s: s.collection + "/" + s.name + @contextlib.contextmanager def open_path_or_file(path_or_file): if isinstance(path_or_file, str): @@ -203,7 +278,7 @@ def open_path_or_file(path_or_file): file_to_close.close() with open_path_or_file(fobj) as csvfile: - stream_names = _stream_names(streamset) + stream_names = _stream_names(streamset, name_callable) fieldnames = fieldnames if fieldnames else ["time"] + list(stream_names) writer = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect=dialect) @@ -213,7 +288,7 @@ def open_path_or_file(path_or_file): writer.writerow(item) -def to_table(streamset, agg="mean"): +def to_table(streamset, agg="mean", name_callable=None): """ Returns string representation of the data in tabular form using the tabulate library. @@ -225,13 +300,24 @@ def to_table(streamset, agg="mean"): from. Must be one of "min", "mean", "max", "count", or "stddev". This argument is ignored if RawPoint values are passed into the function. + name_callable : lambda, default: lambda s: s.collection + "/" + s.name + Sprecify a callable that can be used to determine the column name given a + Stream object. + """ try: from tabulate import tabulate except ImportError: raise ImportError("Please install tabulate to use this transformation function.") - return tabulate(streamset.to_dict(agg=agg), headers="keys") + # TODO: allow this at some future point + if agg == "all": + raise AttributeError("cannot use 'all' as aggregate at this time") + + if not callable(name_callable): + name_callable = lambda s: s.collection + "/" + s.name + + return tabulate(streamset.to_dict(agg=agg, name_callable=name_callable), headers="keys") ########################################################################## diff --git a/tests/btrdb/test_transformers.py b/tests/btrdb/test_transformers.py index 5cef38d..998fe3f 100644 --- a/tests/btrdb/test_transformers.py +++ b/tests/btrdb/test_transformers.py @@ -91,6 +91,7 @@ def statpoint_streamset(): obj = StreamSet(streams) obj.rows = Mock(return_value=rows) obj.values = Mock(return_value=values) + obj.pointwidth = 20 return obj @@ -145,6 +146,10 @@ def statpoint_streamset(): class TestTransformers(object): + ########################################################################## + ## to_dict Tests + ########################################################################## + def test_to_dict(self, streamset): assert to_dict(streamset) == expected["to_dict"] @@ -157,6 +162,28 @@ def test_to_dict_statpoint(self, statpoint_streamset): ] assert to_dict(statpoint_streamset) == expected + def test_to_dict_statpoint_agg_count(self, statpoint_streamset): + expected = [ + {'time': 1500000000100000000, 'test/stream0': 10, 'test/stream1': 11, 'test/stream2': 10, 'test/stream3': 11}, + {'time': 1500000000300000000, 'test/stream0': 11, 'test/stream1': 10, 'test/stream2': 10, 'test/stream3': 11}, + {'time': 1500000000500000000, 'test/stream0': 10, 'test/stream1': 11, 'test/stream2': 10, 'test/stream3': 11}, + {'time': 1500000000700000000, 'test/stream0': 11, 'test/stream1': 10, 'test/stream2': 10, 'test/stream3': 11}, + ] + assert to_dict(statpoint_streamset, agg='count') == expected + + def test_to_dict_statpoint_agg_all(self, statpoint_streamset): + expected = [ + OrderedDict([('time', 1500000000100000000), ('test/stream0-min', 0), ('test/stream0-mean', 2.0), ('test/stream0-max', 2.5), ('test/stream0-count', 10), ('test/stream0-stddev', 1), ('test/stream1-min', 0), ('test/stream1-mean', 4.0), ('test/stream1-max', 4.5), ('test/stream1-count', 11), ('test/stream1-stddev', 1), ('test/stream2-min', 0), ('test/stream2-mean', 6.0), ('test/stream2-max', 6.5), ('test/stream2-count', 10), ('test/stream2-stddev', 1), ('test/stream3-min', 0), ('test/stream3-mean', 8.0), ('test/stream3-max', 8.5), ('test/stream3-count', 11), ('test/stream3-stddev', 2)]), + OrderedDict([('time', 1500000000300000000), ('test/stream0-min', 0), ('test/stream0-mean', 3.0), ('test/stream0-max', 3.5), ('test/stream0-count', 11), ('test/stream0-stddev', 1), ('test/stream1-min', 0), ('test/stream1-mean', 5.0), ('test/stream1-max', 5.5), ('test/stream1-count', 10), ('test/stream1-stddev', 1), ('test/stream2-min', 0), ('test/stream2-mean', 7.0), ('test/stream2-max', 7.5), ('test/stream2-count', 10), ('test/stream2-stddev', 1), ('test/stream3-min', 0), ('test/stream3-mean', 9.0), ('test/stream3-max', 9.5), ('test/stream3-count', 11), ('test/stream3-stddev', 2)]), + OrderedDict([('time', 1500000000500000000), ('test/stream0-min', 0), ('test/stream0-mean', 4.0), ('test/stream0-max', 4.5), ('test/stream0-count', 10), ('test/stream0-stddev', 1), ('test/stream1-min', 0), ('test/stream1-mean', 6.0), ('test/stream1-max', 6.5), ('test/stream1-count', 11), ('test/stream1-stddev', 1), ('test/stream2-min', 0), ('test/stream2-mean', 8.0), ('test/stream2-max', 8.5), ('test/stream2-count', 10), ('test/stream2-stddev', 1), ('test/stream3-min', 0), ('test/stream3-mean', 10.0), ('test/stream3-max', 10.5), ('test/stream3-count', 11), ('test/stream3-stddev', 2)]), + OrderedDict([('time', 1500000000700000000), ('test/stream0-min', 0), ('test/stream0-mean', 5.0), ('test/stream0-max', 5.5), ('test/stream0-count', 11), ('test/stream0-stddev', 1), ('test/stream1-min', 0), ('test/stream1-mean', 7.0), ('test/stream1-max', 7.5), ('test/stream1-count', 10), ('test/stream1-stddev', 1), ('test/stream2-min', 0), ('test/stream2-mean', 9.0), ('test/stream2-max', 9.5), ('test/stream2-count', 10), ('test/stream2-stddev', 1), ('test/stream3-min', 0), ('test/stream3-mean', 11.0), ('test/stream3-max', 11.5), ('test/stream3-count', 11), ('test/stream3-stddev', 2)]) + ] + assert to_dict(statpoint_streamset, agg='all') == expected + + ########################################################################## + ## to_array Tests + ########################################################################## + def test_to_array(self, streamset): """ asserts to_array of rawpoint returns multidimensional ndarry of values @@ -183,6 +210,10 @@ def test_to_array_statpoint(self, statpoint_streamset): assert (result[2] == np.array([2.0, 2.0, 2.0, 2.0, ])).all() assert (result[3] == np.array([3.0, 3.0, 3.0, 3.0, ])).all() + ########################################################################## + ## to_series Tests + ########################################################################## + def test_to_series(self, streamset): """ Asserts to_series produces correct results including series name @@ -199,6 +230,23 @@ def test_to_series(self, streamset): assert (result[idx] == expected[idx]).all() # verify data assert result[idx].name == expected_names[idx] # verify name + def test_to_series_name_lambda(self, streamset): + """ + assert to_dateframe uses name lambda + """ + columns = ["stream0", "stream1", "stream2", "stream3"] + result = streamset.to_series(name_callable=lambda s: s.name) + assert [s.name for s in result] == ['stream0', 'stream1', 'stream2', 'stream3'] + + + def test_to_series_ignores_rawpoint_with_agg(self, streamset): + """ + assert to_series ignores agg if RawPoints are used + """ + result = streamset.to_series(agg="mean") + assert len(result) == 4 + + def test_to_series_statpoint(self, statpoint_streamset): """ Asserts to_series produces correct results with statpoints @@ -241,6 +289,17 @@ def test_to_series_index_as_int(self, streamset): for series in result: assert series.index.dtype.name == 'int64' + def test_to_series_raises_on_agg_all(self, statpoint_streamset): + """ + asserts to_series raises error if using "all" as agg. + """ + with pytest.raises(AttributeError): + statpoint_streamset.to_series(agg="all") + + ########################################################################## + ## to_dataframe Tests + ########################################################################## + def test_to_dataframe(self, streamset): """ assert to_dateframe works on RawPoints @@ -250,6 +309,43 @@ def test_to_dataframe(self, streamset): df.set_index("time", inplace=True) assert to_dataframe(streamset).equals(df) + def test_to_dataframe_column_issues_warning(self, statpoint_streamset): + """ + assert to_dateframe with column argument issues warning + """ + columns = ["time", "test/stream0", "test/stream1", "test/stream2", "test/stream3"] + with pytest.deprecated_call(): + statpoint_streamset.to_dataframe(columns=columns) + + def test_to_dataframe_multindex(self, statpoint_streamset): + """ + assert to_dateframe agg=all creates columned multiindex + """ + df = statpoint_streamset.to_dataframe(agg="all") + assert len(df.columns.levels) == 3 + assert list(df.columns.levels[0]) == ['test'] + assert list(df.columns.levels[1]) == ['stream0', 'stream1', 'stream2', 'stream3'] + assert list(df.columns.levels[2]) == ['count', 'max', 'mean', 'min', 'stddev'] + + def test_to_dataframe_name_lambda(self, streamset): + """ + assert to_dateframe uses name lambda + """ + columns = ["stream0", "stream1", "stream2", "stream3"] + df = streamset.to_dataframe(name_callable=lambda s: s.name) + + assert len(columns) == len(df.columns) + for idx in range(len(columns)): + assert columns[idx] == df.columns[idx] + + def test_to_dataframe_rawpoint_with_agg(self, streamset): + """ + assert to_dateframe ignores agg if RawPoint + """ + df = streamset.to_dataframe(agg="all") + assert df.shape == (10,4) + + def test_to_dataframe_statpoints(self, statpoint_streamset): """ assert to_dateframe works on StatPoints @@ -280,6 +376,10 @@ def test_to_dataframe_agg(self, statpoint_streamset): df = statpoint_streamset.to_dataframe(agg="stddev") assert df["test/stream0"].tolist() == [1, 1, 1, 1] + ########################################################################## + ## to_csv Tests + ########################################################################## + def test_to_csv_as_path(self, streamset, tmpdir): path = os.path.join(tmpdir.dirname, "to_csv_test.csv") streamset.to_dict = Mock(return_value=expected["to_dict"]) @@ -310,13 +410,23 @@ def test_to_csv_as_stringio(self, streamset): item[k] = int(v) assert result == expected["to_dict"] + def test_to_csv_raises_on_agg_all(self, statpoint_streamset): + """ + asserts to_csv raises error if using "all" as agg. + """ + with pytest.raises(AttributeError): + statpoint_streamset.to_csv("tmp.txt", agg="all") + + ########################################################################## + ## to_table Tests + ########################################################################## + def test_to_table(self, streamset): """ asserts to_table returns formatted ASCII table """ assert to_table(streamset) == expected["table"] - def test_to_table(self, statpoint_streamset): """ asserts to_table handles statpoints with requested key @@ -336,3 +446,22 @@ def test_to_table(self, statpoint_streamset): 1500000000500000000 4.5 6.5 8.5 10.5 1500000000700000000 5.5 7.5 9.5 11.5""" assert to_table(statpoint_streamset, agg="max") == expected + + def test_to_table_name_callable(self, statpoint_streamset): + """ + asserts to_table handles name_callable + """ + expected = """ time stream0 stream1 stream2 stream3 +------------------- --------- --------- --------- --------- +1500000000100000000 2 4 6 8 +1500000000300000000 3 5 7 9 +1500000000500000000 4 6 8 10 +1500000000700000000 5 7 9 11""" + assert to_table(statpoint_streamset, name_callable=lambda s: s.name) == expected + + def test_to_table_raises_on_agg_all(self, statpoint_streamset): + """ + asserts to_table raises error if using "all" as agg. + """ + with pytest.raises(AttributeError): + to_table(statpoint_streamset, agg="all") \ No newline at end of file From 732b34f3a7b84aceaceabdadf1c4b488419d61d8 Mon Sep 17 00:00:00 2001 From: looselycoupled Date: Wed, 13 May 2020 16:40:27 -0400 Subject: [PATCH 2/3] fixes for incorrect handling of columns arg --- btrdb/transformers.py | 2 +- tests/btrdb/test_transformers.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/btrdb/transformers.py b/btrdb/transformers.py index 35de5f1..f4ce241 100644 --- a/btrdb/transformers.py +++ b/btrdb/transformers.py @@ -151,7 +151,7 @@ def to_dataframe(streamset, columns=None, agg="mean", name_callable=None): stream_names = [[s.collection, s.name, prop] for s in streamset._streams for prop in _STAT_PROPERTIES] df.columns=pd.MultiIndex.from_tuples(stream_names) else: - df.columns = _stream_names(streamset, name_callable) + df.columns = columns if columns else _stream_names(streamset, name_callable) return df diff --git a/tests/btrdb/test_transformers.py b/tests/btrdb/test_transformers.py index 998fe3f..28638bb 100644 --- a/tests/btrdb/test_transformers.py +++ b/tests/btrdb/test_transformers.py @@ -313,10 +313,20 @@ def test_to_dataframe_column_issues_warning(self, statpoint_streamset): """ assert to_dateframe with column argument issues warning """ - columns = ["time", "test/stream0", "test/stream1", "test/stream2", "test/stream3"] + columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"] with pytest.deprecated_call(): statpoint_streamset.to_dataframe(columns=columns) + def test_to_dataframe_column(self, statpoint_streamset): + """ + assert to_dateframe with column argument actually renames columns + """ + columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"] + with pytest.deprecated_call(): + df = statpoint_streamset.to_dataframe(columns=columns) + + assert df.columns.tolist() == columns + def test_to_dataframe_multindex(self, statpoint_streamset): """ assert to_dateframe agg=all creates columned multiindex From 218477538df50e7da0bbb23d37b8976a8a0c23e1 Mon Sep 17 00:00:00 2001 From: looselycoupled Date: Wed, 13 May 2020 16:43:49 -0400 Subject: [PATCH 3/3] minor fix --- tests/btrdb/test_transformers.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/btrdb/test_transformers.py b/tests/btrdb/test_transformers.py index 28638bb..cdad89e 100644 --- a/tests/btrdb/test_transformers.py +++ b/tests/btrdb/test_transformers.py @@ -234,11 +234,9 @@ def test_to_series_name_lambda(self, streamset): """ assert to_dateframe uses name lambda """ - columns = ["stream0", "stream1", "stream2", "stream3"] result = streamset.to_series(name_callable=lambda s: s.name) assert [s.name for s in result] == ['stream0', 'stream1', 'stream2', 'stream3'] - def test_to_series_ignores_rawpoint_with_agg(self, streamset): """ assert to_series ignores agg if RawPoints are used @@ -246,7 +244,6 @@ def test_to_series_ignores_rawpoint_with_agg(self, streamset): result = streamset.to_series(agg="mean") assert len(result) == 4 - def test_to_series_statpoint(self, statpoint_streamset): """ Asserts to_series produces correct results with statpoints