Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 105 additions & 19 deletions btrdb/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,36 @@
import csv
import contextlib
from collections import OrderedDict
from warnings import warn

##########################################################################
## Helper Functions
##########################################################################

_STAT_PROPERTIES = ('min', 'mean', 'max', 'count', 'stddev')

def _get_time_from_row(row):
for item in row:
if item: return item.time
raise Exception("Row contains no data")


def _stream_names(streamset):
def _stream_names(streamset, func):
"""
private convenience function to come up with proper final stream names
before sending a collection of streams (dataframe, etc.) back to the
user.
"""
return tuple(
s.collection + "/" + s.name \
for s in streamset._streams
func(s) for s in streamset._streams
)


##########################################################################
## Transform Functions
##########################################################################

def to_series(streamset, datetime64_index=True, agg="mean"):
def to_series(streamset, datetime64_index=True, agg="mean", name_callable=None):
"""
Returns a list of Pandas Series objects indexed by time

Expand All @@ -55,14 +62,26 @@ def to_series(streamset, datetime64_index=True, agg="mean"):
from. Must be one of "min", "mean", "max", "count", or "stddev". This
argument is ignored if RawPoint values are passed into the function.

name_callable : lambda, default: lambda s: s.collection + "/" + s.name
Sprecify a callable that can be used to determine the series name given a
Stream object.

"""
try:
import pandas as pd
except ImportError:
raise ImportError("Please install Pandas to use this transformation function.")

# TODO: allow this at some future point
if agg == "all":
raise AttributeError("cannot use 'all' as aggregate at this time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if agg is something other than mean, min, max, etc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyError I believe. I'm fine with not protecting against that.


if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name


result = []
stream_names = _stream_names(streamset)
stream_names = _stream_names(streamset, name_callable)

for idx, output in enumerate(streamset.values()):
times, values = [], []
Expand All @@ -82,30 +101,59 @@ def to_series(streamset, datetime64_index=True, agg="mean"):
return result


def to_dataframe(streamset, columns=None, agg="mean"):
def to_dataframe(streamset, columns=None, agg="mean", name_callable=None):
"""
Returns a Pandas DataFrame object indexed by time and using the values of a
stream for each column.

Parameters
----------
columns: sequence
column names to use for DataFrame
column names to use for DataFrame. Deprecated and not compatible with name_callable.

agg : str, default: "mean"
Specify the StatPoint field (e.g. aggregating function) to create the Series
from. Must be one of "min", "mean", "max", "count", or "stddev". This
argument is ignored if RawPoint values are passed into the function.
from. Must be one of "min", "mean", "max", "count", "stddev", or "all". This
argument is ignored if not using StatPoints.

name_callable : lambda, default: lambda s: s.collection + "/" + s.name
Sprecify a callable that can be used to determine the series name given a
Stream object. This is not compatible with agg == "all" at this time


"""
try:
import pandas as pd
except ImportError:
raise ImportError("Please install Pandas to use this transformation function.")

stream_names = _stream_names(streamset)
columns = columns if columns else ["time"] + list(stream_names)
return pd.DataFrame(to_dict(streamset,agg=agg), columns=columns).set_index("time")
# deprecation warning added in v5.8
if columns:
warn("the columns argument is deprecated and will be removed in a future release", DeprecationWarning, stacklevel=2)

# TODO: allow this at some future point
if agg == "all" and name_callable is not None:
raise AttributeError("cannot provide name_callable when using 'all' as aggregate at this time")

# do not allow agg="all" with RawPoints
if agg == "all" and streamset.allow_window:
agg=""

# default arg values
if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name


df = pd.DataFrame(to_dict(streamset,agg=agg))
df = df.set_index("time")

if agg == "all" and not streamset.allow_window:
stream_names = [[s.collection, s.name, prop] for s in streamset._streams for prop in _STAT_PROPERTIES]
df.columns=pd.MultiIndex.from_tuples(stream_names)
else:
df.columns = columns if columns else _stream_names(streamset, name_callable)

return df


def to_array(streamset, agg="mean"):
Expand All @@ -126,6 +174,10 @@ def to_array(streamset, agg="mean"):
except ImportError:
raise ImportError("Please install Numpy to use this transformation function.")

# TODO: allow this at some future point
if agg == "all":
raise AttributeError("cannot use 'all' as aggregate at this time")

results = []
for points in streamset.values():
segment = []
Expand All @@ -138,7 +190,7 @@ def to_array(streamset, agg="mean"):
return np.array(results)


def to_dict(streamset, agg="mean"):
def to_dict(streamset, agg="mean", name_callable=None):
"""
Returns a list of OrderedDict for each time code with the appropriate
stream data attached.
Expand All @@ -150,9 +202,17 @@ def to_dict(streamset, agg="mean"):
keys. Must be one of "min", "mean", "max", "count", or "stddev". This
argument is ignored if RawPoint values are passed into the function.

name_callable : lambda, default: lambda s: s.collection + "/" + s.name
Sprecify a callable that can be used to determine the series name given a
Stream object.

"""
if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name

data = []
stream_names = _stream_names(streamset)
stream_names = _stream_names(streamset, name_callable)

for row in streamset.rows():
item = OrderedDict({
"time": _get_time_from_row(row),
Expand All @@ -161,12 +221,16 @@ def to_dict(streamset, agg="mean"):
if row[idx].__class__.__name__ == "RawPoint":
item[col] = row[idx].value if row[idx] else None
else:
item[col] = getattr(row[idx], agg) if row[idx] else None
if agg == "all":
for stat in _STAT_PROPERTIES:
item["{}-{}".format(col, stat)] = getattr(row[idx], stat) if row[idx] else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

else:
item[col] = getattr(row[idx], agg) if row[idx] else None
data.append(item)
return data


def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"):
def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean", name_callable=None):
"""
Saves stream data as a CSV file.

Expand All @@ -187,8 +251,19 @@ def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"):
Specify the StatPoint field (e.g. aggregating function) to return when
limiting results. Must be one of "min", "mean", "max", "count", or "stddev".
This argument is ignored if RawPoint values are passed into the function.

name_callable : lambda, default: lambda s: s.collection + "/" + s.name
Sprecify a callable that can be used to determine the series name given a
Stream object.
"""

# TODO: allow this at some future point
if agg == "all":
raise AttributeError("cannot use 'all' as aggregate at this time")

if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to raise something here? It might be confusing if the user expects the columns to have custom names, but they aren't appearing and they wouldn't know why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. this is mostly checking for None and making sure they user gave us something valid. I'll add a ticket to rethink this in the future.


@contextlib.contextmanager
def open_path_or_file(path_or_file):
if isinstance(path_or_file, str):
Expand All @@ -203,7 +278,7 @@ def open_path_or_file(path_or_file):
file_to_close.close()

with open_path_or_file(fobj) as csvfile:
stream_names = _stream_names(streamset)
stream_names = _stream_names(streamset, name_callable)
fieldnames = fieldnames if fieldnames else ["time"] + list(stream_names)

writer = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect=dialect)
Expand All @@ -213,7 +288,7 @@ def open_path_or_file(path_or_file):
writer.writerow(item)


def to_table(streamset, agg="mean"):
def to_table(streamset, agg="mean", name_callable=None):
"""
Returns string representation of the data in tabular form using the tabulate
library.
Expand All @@ -225,13 +300,24 @@ def to_table(streamset, agg="mean"):
from. Must be one of "min", "mean", "max", "count", or "stddev". This
argument is ignored if RawPoint values are passed into the function.

name_callable : lambda, default: lambda s: s.collection + "/" + s.name
Sprecify a callable that can be used to determine the column name given a
Stream object.

"""
try:
from tabulate import tabulate
except ImportError:
raise ImportError("Please install tabulate to use this transformation function.")

return tabulate(streamset.to_dict(agg=agg), headers="keys")
# TODO: allow this at some future point
if agg == "all":
raise AttributeError("cannot use 'all' as aggregate at this time")

if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name

return tabulate(streamset.to_dict(agg=agg, name_callable=name_callable), headers="keys")


##########################################################################
Expand Down
Loading