Skip to content

Commit

Permalink
Support streaming channel data and iterating over channel values (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Mar 28, 2020
1 parent ce39459 commit 3e41ee2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/reading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ For example, to compute the mean of a channel::
channel_mean = channel_sum / channel_length

This approach can also be useful to stream TDMS data to another format on disk or into a data store.
It's also possible to stream data chunks for a single channel using :py:meth:`~nptdms.TdmsChannel.data_chunks`::

with TdmsFile.open(tdms_file_path) as tdms_file:
channel = tdms_file[group_name][channel_name]
for chunk in channel.data_chunks():
channel_chunk_data = chunk[:]

In cases where you don't need to read the file data and only need to read metadata, you can
also use the static :py:meth:`~nptdms.TdmsFile.read_metadata` method::
Expand Down
40 changes: 35 additions & 5 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ def _read_data(self, tdms_reader):

self.data_read = True

def _read_channel_data_chunks(self, channel):
if self._reader is None:
raise RuntimeError(
"Cannot read channel data after the underlying TDMS reader is closed")
for chunk in self._reader.read_raw_data_for_channel(channel.path):
yield chunk

def _read_channel_data(self, channel, offset=0, length=None):
if offset < 0:
raise ValueError("offset must be non-negative")
Expand Down Expand Up @@ -496,6 +503,12 @@ def __repr__(self):
def __len__(self):
return self._length

def __iter__(self):
if self._raw_data is not None:
return iter(self.data)
else:
return self._read_data_values()

@_property_builtin
def path(self):
""" Path to the TDMS object for this channel
Expand Down Expand Up @@ -570,6 +583,17 @@ def raw_scaler_data(self):

return self._raw_data.scaler_data

def data_chunks(self):
""" A generator that streams chunks data for this channel from disk.
This method may only be used when the TDMS file was opened without reading all data immediately.
:rtype: Generator that yields :class:`ChannelDataChunk` objects
"""
channel_offset = 0
for raw_data_chunk in self._tdms_file._read_channel_data_chunks(self):
yield ChannelDataChunk(self._tdms_file, self, raw_data_chunk, channel_offset)
channel_offset += len(raw_data_chunk)

def read_data(self, offset=0, length=None, scaled=True):
""" Reads data for this channel from the TDMS file and returns it
Expand Down Expand Up @@ -668,6 +692,11 @@ def as_dataframe(self, time_index=False, absolute_time=False, scaled_data=True):

return pandas_export.from_channel(self, time_index, absolute_time, scaled_data)

def _read_data_values(self):
for chunk in self.data_chunks():
for value in chunk:
yield value

def _scale_data(self, raw_data):
scale = self._get_scaling()
if scale is not None:
Expand Down Expand Up @@ -769,7 +798,11 @@ class GroupDataChunk(object):
def __init__(self, tdms_file, group, raw_data_chunk, channel_offsets):
self.name = group.name
self._channels = OrderedDict(
(channel.name, ChannelDataChunk(tdms_file, channel, raw_data_chunk, channel_offsets[channel.path]))
(channel.name, ChannelDataChunk(
tdms_file,
channel,
raw_data_chunk.channel_data.get(channel.path, RawChannelDataChunk.empty()),
channel_offsets[channel.path]))
for channel in group.channels())

def __getitem__(self, channel_name):
Expand Down Expand Up @@ -800,10 +833,7 @@ def __init__(self, tdms_file, channel, raw_data_chunk, offset):
self._channel = channel
self.name = channel.name
self.offset = offset
try:
self._raw_data = raw_data_chunk.channel_data[channel.path]
except KeyError:
self._raw_data = RawChannelDataChunk.empty()
self._raw_data = raw_data_chunk
self._scaled_data = None

def __len__(self):
Expand Down
43 changes: 43 additions & 0 deletions nptdms/test/test_tdms_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,49 @@ def test_indexing_and_iterating_data_chunks():
compare_arrays(actual_data, expected_data)


def test_stream_channel_data_chunks():
"""Test streaming chunks of data for a single channel from a TDMS file
"""
test_file, expected_data = scenarios.chunked_segment().values

with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
actual_data = []
for chunk in tdms_file[group][channel].data_chunks():
assert chunk.offset == len(actual_data)
actual_data.extend(chunk[:])
compare_arrays(actual_data, expected_channel_data)


def test_iterate_channel_data_in_open_mode():
"""Test iterating over channel data after opening a file without reading data
"""
test_file, expected_data = scenarios.chunked_segment().values

with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
actual_data = []
for value in tdms_file[group][channel]:
actual_data.append(value)
compare_arrays(actual_data, expected_channel_data)


def test_iterate_channel_data_in_read_mode():
"""Test iterating over channel data after reading all data
"""
test_file, expected_data = scenarios.chunked_segment().values

with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
for ((group, channel), expected_channel_data) in expected_data.items():
actual_data = []
for value in tdms_file[group][channel]:
actual_data.append(value)
compare_arrays(actual_data, expected_channel_data)


def test_invalid_offset_throws():
""" Exception is thrown when reading a subset of data with an invalid offset
"""
Expand Down

0 comments on commit 3e41ee2

Please sign in to comment.