Skip to content

Commit

Permalink
Lazy loading of whole channel data (#160)
Browse files Browse the repository at this point in the history
Add support for lazily loading data for a channel after opening a TDMS file:

* Adds an explicit read_data method to TdmsObject to make it obvious this is an expensive operation, and allow extending this later to support loading a subset of the data (#39)
* Supports DAQmx and interleaved data by reading whole chunks and discarding the unneeded data
* More optimised implementation for contiguous data

This also introduces some static helper methods to make initialising a TdmsFile object more straightforward. There is now TdmsFile.read to read all data, TdmsFile.open to read metadata and keep the file open for reading, and TdmsFile.read_metadata to read metadata only.

Fixes #43
  • Loading branch information
adamreeve committed Mar 12, 2020
1 parent 6895ae8 commit 2853667
Show file tree
Hide file tree
Showing 11 changed files with 498 additions and 124 deletions.
60 changes: 60 additions & 0 deletions nptdms/base_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ def read_raw_data(self, f):
for chunk in range(self.num_chunks):
yield self._read_data_chunk(f, data_objects, chunk)

def read_raw_data_for_channel(self, f, channel_path):
"""Read raw data from a TDMS segment
:returns: A generator of ChannelDataChunk objects with raw channel data for
a single channel in this segment.
"""

if not self.toc_mask & toc_properties['kTocRawData']:
yield ChannelDataChunk.empty()

f.seek(self.data_position)

data_objects = [o for o in self.ordered_objects if o.has_data]
for chunk in range(self.num_chunks):
yield self._read_channel_data_chunk(f, data_objects, chunk, channel_path)

def _calculate_chunks(self):
"""
Work out the number of chunks the data is in, for cases
Expand Down Expand Up @@ -237,8 +253,27 @@ def _get_chunk_size(self):
for o in self.ordered_objects if o.has_data])

def _read_data_chunk(self, file, data_objects, chunk_index):
""" Read data from a chunk for all channels
"""
raise NotImplementedError("Data chunk reading must be implemented in base classes")

def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
""" Read data from a chunk for a single channel
"""
# In the base case we can read data for all channels
# and then select only the requested channel.
# Derived classes can implement more optimised reading.
data_chunk = self._read_data_chunk(file, data_objects, chunk_index)
try:
if data_chunk.raw_data:
return ChannelDataChunk.channel_data(data_chunk.raw_data[channel_path])
elif data_chunk.daqmx_raw_data:
return ChannelDataChunk.scaler_data(data_chunk.daqmx_raw_data[channel_path])
else:
return ChannelDataChunk.empty()
except KeyError:
return ChannelDataChunk.empty()

def _new_segment_object(self, object_path):
""" Create a new segment object for a segment
Expand Down Expand Up @@ -305,6 +340,31 @@ def scaler_data(data):
return DataChunk({}, data)


class ChannelDataChunk(object):
"""Data read for a single channel from a single chunk in a TDMS segment
:ivar raw_data: Raw data in this chunk for a standard TDMS channel.
:ivar daqmx_raw_data: A dictionary of scaler data in this segment for
DAQmx raw data. Keys are the scaler id and values are data arrays.
"""

def __init__(self, data, daqmx_data):
self.raw_data = data
self.daqmx_raw_data = daqmx_data

@staticmethod
def empty():
return ChannelDataChunk(None, None)

@staticmethod
def channel_data(data):
return ChannelDataChunk(data, None)

@staticmethod
def scaler_data(data):
return ChannelDataChunk(None, data)


def read_property(f, endianness="<"):
""" Read a property from a segment's metadata """

Expand Down
3 changes: 3 additions & 0 deletions nptdms/channel_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self):
"""Initialise new data receiver for a TDMS object
"""
self.data = []
self.scaler_data = {}

def append_data(self, data):
"""Append data from a segment
Expand All @@ -63,6 +64,7 @@ def __init__(self, obj, memmap_dir=None):
self.path = obj.path
self.data = _new_numpy_array(
obj.data_type.nptype, obj.number_values, memmap_dir)
self.scaler_data = {}
self._data_insert_position = 0
log.debug(
"Allocated %d sample slots for %s", len(self.data), obj.path)
Expand Down Expand Up @@ -93,6 +95,7 @@ def __init__(self, obj, memmap_dir=None):
"""

self.path = obj.path
self.data = None
self.scaler_data = {}
self._scaler_insert_positions = {}
for scaler_id, scaler_type in obj.scaler_data_types.items():
Expand Down
34 changes: 32 additions & 2 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,29 @@ class TdmsReader(object):
def __init__(self, tdms_file):
""" Initialise a new TdmsReader
:param tdms_file: An opened file object.
:param tdms_file: Either the path to the tdms file to read or an already
opened file.
"""
self._file = tdms_file
self._segments = None
self._prev_segment_objects = {}
self.object_metadata = OrderedDict()
self._file_path = None

if hasattr(tdms_file, "read"):
# Is a file
self._file = tdms_file
else:
# Is path to a file
self._file = open(tdms_file, 'rb')
self._file_path = tdms_file

def close(self):
if self._file_path is not None:
# File path was provided so we opened the file and
# should close it.
self._file.close()
# Otherwise always remove reference to the file
self._file = None

def read_metadata(self):
""" Read all metadata and structure information from a TdmsFile
Expand Down Expand Up @@ -62,6 +79,19 @@ def read_raw_data(self):
for chunk in segment.read_raw_data(self._file):
yield chunk

def read_raw_data_for_channel(self, channel_path):
""" Read raw data for a single channel, chunk by chunk
:param channel_path: The path of the channel object to read data for
:returns: A generator that yields ChannelDataChunk objects
"""
if self._segments is None:
raise RuntimeError(
"Cannot read data unless metadata has first been read")
for segment in self._segments:
for chunk in segment.read_raw_data_for_channel(self._file, channel_path):
yield chunk

def _update_object_metadata(self, segment):
""" Update object metadata using the metadata read from a single segment
"""
Expand Down
27 changes: 11 additions & 16 deletions nptdms/scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,40 +312,36 @@ class MultiScaling(object):
def __init__(self, scalings):
self.scalings = scalings

def scale(self, data):
final_scale = len(self.scalings) - 1
return self._compute_scaled_data(final_scale, data, {})

def scale_daqmx(self, scaler_data):
def scale(self, raw_channel_data):
final_scale = len(self.scalings) - 1
return self._compute_scaled_data(final_scale, None, scaler_data)
return self._compute_scaled_data(final_scale, raw_channel_data)

def _compute_scaled_data(self, scale_index, raw_data, scaler_data):
def _compute_scaled_data(self, scale_index, raw_channel_data):
""" Compute output data from a single scale in the set of all scalings,
computing any required input scales recursively.
"""
if scale_index == RAW_DATA_INPUT_SOURCE:
if raw_data is None:
if raw_channel_data.data is None:
raise Exception("Invalid scaling input source for DAQmx data")
return raw_data
return raw_channel_data.data

scaling = self.scalings[scale_index]
if scaling is None:
raise Exception(
"Cannot compute data for scale %d" % scale_index)

if isinstance(scaling, DaqMxScalerScaling):
return scaling.scale_daqmx(scaler_data)
return scaling.scale_daqmx(raw_channel_data.scaler_data)
elif hasattr(scaling, 'input_source'):
input_data = self._compute_scaled_data(
scaling.input_source, raw_data, scaler_data)
scaling.input_source, raw_channel_data)
return scaling.scale(input_data)
elif (hasattr(scaling, 'left_input_source') and
hasattr(scaling, 'right_input_source')):
left_input_data = self._compute_scaled_data(
scaling.left_input_source, raw_data, scaler_data)
scaling.left_input_source, raw_channel_data)
right_input_data = self._compute_scaled_data(
scaling.right_input_source, raw_data, scaler_data)
scaling.right_input_source, raw_channel_data)
return scaling.scale(left_input_data, right_input_data)
else:
raise ValueError("Cannot compute scaled data for %r" % scaling)
Expand Down Expand Up @@ -401,12 +397,11 @@ def _get_channel_scaling(properties):
properties, scale_index)
else:
log.warning("Unsupported scale type: %s", scale_type)
return None

if not scalings:
return None
if len(scalings) > 1:
return MultiScaling(scalings)
return scalings[0]
return MultiScaling(scalings)


_scale_regex = re.compile(r"NI_Scale\[(\d+)\]_Scale_Type")
Expand Down

0 comments on commit 2853667

Please sign in to comment.