Skip to content

Commit

Permalink
Improve performance reading many small interleaved data chunks (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Oct 11, 2020
1 parent 73609cf commit dcc96fa
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
28 changes: 23 additions & 5 deletions nptdms/base_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def read_raw_data(self, f):
total_data_size, f.tell(), self.num_chunks)

data_objects = [o for o in self.ordered_objects if o.has_data]
for chunk in range(self.num_chunks):
yield self._read_data_chunk(f, data_objects, chunk)
for chunk in self._read_data_chunks(f, data_objects, self.num_chunks):
yield chunk

def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=None):
"""Read raw data from a TDMS segment
Expand All @@ -232,8 +232,8 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=
if chunk_offset > 0:
f.seek(chunk_size * chunk_offset, os.SEEK_CUR)
stop_chunk = self.num_chunks if num_chunks is None else num_chunks + chunk_offset
for chunk_index in range(chunk_offset, stop_chunk):
yield self._read_channel_data_chunk(f, data_objects, chunk_index, channel_path)
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
yield chunk

def _calculate_chunks(self):
"""
Expand Down Expand Up @@ -272,18 +272,36 @@ def _get_chunk_size(self):
o.data_size
for o in self.ordered_objects if o.has_data])

def _read_data_chunks(self, file, data_objects, num_chunks):
""" Read multiple data chunks at once
In the base case we read each chunk individually but subclasses can override this
"""
for chunk in range(num_chunks):
yield self._read_data_chunk(file, data_objects, chunk)

def _read_data_chunk(self, file, data_objects, chunk_index):
""" Read data from a chunk for all channels
"""
raise NotImplementedError("Data chunk reading must be implemented in base classes")

def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
""" Read multiple data chunks for a single channel at once
In the base case we read each chunk individually but subclasses can override this
"""
for chunk_index in range(chunk_offset, stop_chunk):
yield self._read_channel_data_chunk(file, data_objects, chunk_index, channel_path)

def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
""" Read data from a chunk for a single channel
"""
# In the base case we can read data for all channels
# and then select only the requested channel.
# Derived classes can implement more optimised reading.
data_chunk = self._read_data_chunk(file, data_objects, chunk_index)
return BaseSegment._data_chunk_to_channel_chunk(data_chunk, channel_path)

@staticmethod
def _data_chunk_to_channel_chunk(data_chunk, channel_path):
try:
return data_chunk.channel_data[channel_path]
except KeyError:
Expand Down Expand Up @@ -404,7 +422,7 @@ def fromfile(file, dtype, count, *args, **kwargs):
return np.fromfile(file, dtype=dtype, count=count, *args, **kwargs)
except (TypeError, IOError, UnsupportedOperation):
return np.frombuffer(
file.read(count * np.dtype(dtype).itemsize),
file.read(int(count * np.dtype(dtype).itemsize)),
dtype=dtype, count=count, *args, **kwargs)


Expand Down
6 changes: 3 additions & 3 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
end_segment = np.searchsorted(segment_offsets, end_index, side='left')

segment_index = start_segment
values_read = 0
for segment in self._segments[start_segment:end_segment + 1]:
self._verify_segment_start(segment)
# By default, read all chunks in a segment
Expand All @@ -155,7 +156,6 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
chunk_size = chunk_sizes[segment_index]
segment_start_index = 0 if segment_index == 0 else segment_offsets[segment_index - 1]
remaining_values_to_skip = 0
remaining_values_to_trim = 0

# For the first and last segments, we may not need to read all chunks,
# and may need to trim some data from the beginning or end of the chunk.
Expand All @@ -177,12 +177,12 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
num_values_to_trim -= final_chunk_size

num_chunks -= num_values_to_trim // chunk_size
remaining_values_to_trim = num_values_to_trim % chunk_size

for i, chunk in enumerate(
segment.read_raw_data_for_channel(self._file, channel_path, chunk_offset, num_chunks)):
skip = remaining_values_to_skip if i == 0 else 0
trim = remaining_values_to_trim if i == (num_chunks - 1) else 0
values_read += len(chunk) - skip
trim = 0 if values_read < length else values_read - length
yield _trim_channel_chunk(chunk, skip, trim)

segment_index += 1
Expand Down
28 changes: 21 additions & 7 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class InterleavedDataSegment(BaseSegment):
def _new_segment_object(self, object_path):
return TdmsSegmentObject(object_path, self.endianness)

def _read_data_chunk(self, file, data_objects, chunk_index):
def _read_data_chunks(self, file, data_objects, num_chunks):
""" Read multiple data chunks at once
"""
# If all data types are sized and all the lengths are
# the same, then we can read all data at once with numpy,
# which is much faster
Expand All @@ -33,11 +35,23 @@ def _read_data_chunk(self, file, data_objects, chunk_index):
same_length = (len(
set((o.number_values for o in data_objects))) == 1)
if all_sized and same_length:
return self._read_interleaved_sized(file, data_objects)
return [self._read_interleaved_sized(file, data_objects, num_chunks)]
else:
return self._read_interleaved(file, data_objects)
return [self._read_interleaved(file, data_objects, num_chunks)]

def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
""" Read multiple data chunks for a single channel at once
"""
num_chunks = stop_chunk - chunk_offset
all_chunks = self._read_data_chunks(file, data_objects, num_chunks)
return [BaseSegment._data_chunk_to_channel_chunk(chunk, channel_path) for chunk in all_chunks]

def _read_data_chunk(self, file, data_objects, chunk_index):
""" Not used for interleaved data, multiple chunks are read at once
"""
raise NotImplementedError("Reading a single chunk is not implemented for interleaved data")

def _read_interleaved_sized(self, file, data_objects):
def _read_interleaved_sized(self, file, data_objects, num_chunks):
"""Read interleaved data where all channels have a sized data type and the same length
"""
log.debug("Reading interleaved data all at once")
Expand All @@ -47,7 +61,7 @@ def _read_interleaved_sized(self, file, data_objects):

# Read all data into 1 byte unsigned ints first
combined_data = read_interleaved_segment_bytes(
file, total_data_width, data_objects[0].number_values)
file, total_data_width, data_objects[0].number_values * num_chunks)

# Now get arrays for each channel
channel_data = {}
Expand All @@ -71,7 +85,7 @@ def _read_interleaved_sized(self, file, data_objects):

return RawDataChunk.channel_data(channel_data)

def _read_interleaved(self, file, data_objects):
def _read_interleaved(self, file, data_objects, num_chunks):
"""Read interleaved data that doesn't have a numpy type"""

log.debug("Reading interleaved data point by point")
Expand All @@ -80,7 +94,7 @@ def _read_interleaved(self, file, data_objects):
for obj in data_objects:
object_data[obj.path] = obj.new_segment_data()
points_added[obj.path] = 0
while any([points_added[o.path] < o.number_values
while any([points_added[o.path] < (o.number_values * num_chunks)
for o in data_objects]):
for obj in data_objects:
if points_added[obj.path] < obj.number_values:
Expand Down

0 comments on commit dcc96fa

Please sign in to comment.