Skip to content

Commit

Permalink
Reduce memory usage for channel indexes in open mode (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Mar 12, 2021
1 parent 40e8409 commit 69710c3
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 73 deletions.
161 changes: 102 additions & 59 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from nptdms.common import ObjectPath, toc_properties
from nptdms.utils import Timer, OrderedDict
from nptdms.base_segment import RawChannelDataChunk
from nptdms.tdms_segment import TdmsSegment
from nptdms.tdms_segment import TdmsSegment, SegmentIndexCache
from nptdms.log import log_manager


Expand All @@ -33,9 +33,7 @@ def __init__(self, tdms_file):
self.object_metadata = OrderedDict()
self._file_path = None
self._index_file_path = None

self._segment_channel_offsets = None
self._segment_chunk_sizes = None
self._segment_channel_offsets = {}

if hasattr(tdms_file, "read"):
# Is a file
Expand All @@ -60,8 +58,10 @@ def close(self):
# Otherwise always remove reference to the file
self._file = None

def read_metadata(self):
def read_metadata(self, require_segment_indexes=False):
""" Read all metadata and structure information from a TdmsFile
:param require_segment_indexes: Whether to create segment object indexes to allow lookup of objects by path.
"""
self._ensure_open()

Expand All @@ -78,11 +78,12 @@ def read_metadata(self):
with Timer(log, "Read metadata"):
# Read metadata first to work out how much space we need
previous_segment = None
index_cache = SegmentIndexCache() if require_segment_indexes else None
while True:
start_position = file.tell()
try:
segment = self._read_segment_metadata(
file, segment_position, previous_segment, reading_index_file)
file, segment_position, index_cache, previous_segment, reading_index_file)
except EOFError:
# We've finished reading the file
break
Expand Down Expand Up @@ -130,20 +131,25 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")

if self._segment_channel_offsets is None:
with Timer(log, "Build data index"):
self._build_index()
segment_offsets = self._segment_channel_offsets[channel_path]
chunk_sizes = self._segment_chunk_sizes[channel_path]
try:
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
except KeyError:
with Timer(log, "Build data index for channel"):
self._build_index(channel_path)
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]

object_metadata = self.object_metadata[channel_path]
max_length_from_offset = object_metadata.num_values - offset
if length is None:
length = object_metadata.num_values - offset
length = max_length_from_offset
else:
# Make sure we're not trying to read more data than is actually available
length = min(length, max_length_from_offset)
end_index = offset + length

# Binary search to find first and last segments to read
start_segment = np.searchsorted(segment_offsets, offset, side='right')
end_segment = np.searchsorted(segment_offsets, end_index, side='left')
start_segment = first_segment + np.searchsorted(segment_offsets, offset, side='right')
end_segment = first_segment + np.searchsorted(segment_offsets, end_index, side='left')

segment_index = start_segment
values_read = 0
Expand All @@ -152,8 +158,10 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
# By default, read all chunks in a segment
chunk_offset = 0
num_chunks = segment.num_chunks
chunk_size = chunk_sizes[segment_index]
segment_start_index = 0 if segment_index == 0 else segment_offsets[segment_index - 1]
segment_obj = segment.get_segment_object(channel_path)
chunk_size = 0 if segment_obj is None else segment_obj.number_values
segment_start_index = (
0 if segment_index == first_segment else segment_offsets[segment_index - first_segment - 1])
remaining_values_to_skip = 0

# For the first and last segments, we may not need to read all chunks,
Expand All @@ -165,7 +173,7 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
num_chunks -= chunk_offset
if segment_index == end_segment:
# Note: segment_index may be both start and end
segment_end_index = segment_offsets[segment_index]
segment_end_index = segment_offsets[segment_index - first_segment]
num_values_to_trim = segment_end_index - end_index

# Account for segments where the final chunk is truncated
Expand Down Expand Up @@ -196,16 +204,20 @@ def read_channel_chunk_for_index(self, channel_path, index):
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")

if self._segment_channel_offsets is None:
with Timer(log, "Build data index"):
self._build_index()
segment_offsets = self._segment_channel_offsets[channel_path]
try:
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
except KeyError:
with Timer(log, "Build data index for channel"):
self._build_index(channel_path)
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]

# Binary search to find the segment to read
segment_index = np.searchsorted(segment_offsets, index, side='right')
segment_index = first_segment + np.searchsorted(segment_offsets, index, side='right')
segment = self._segments[segment_index]
chunk_size = self._segment_chunk_sizes[channel_path][segment_index]
segment_start_index = segment_offsets[segment_index - 1] if segment_index > 0 else 0
segment_obj = segment.get_segment_object(channel_path)
chunk_size = 0 if segment_obj is None else segment_obj.number_values
segment_start_index = (
0 if segment_index == first_segment else segment_offsets[segment_index - first_segment - 1])

index_in_segment = index - segment_start_index
chunk_index = index_in_segment // chunk_size
Expand All @@ -216,7 +228,7 @@ def read_channel_chunk_for_index(self, channel_path, index):
return chunk_data, chunk_offset

def _read_segment_metadata(
self, file, segment_position, previous_segment=None, is_index_file=False):
self, file, segment_position, index_cache, previous_segment=None, is_index_file=False):
(position, toc_mask, endianness, data_position, raw_data_offset,
next_segment_offset, next_segment_pos) = self._read_lead_in(file, segment_position, is_index_file)

Expand All @@ -225,7 +237,7 @@ def _read_segment_metadata(
next_segment_pos, raw_data_offset, data_position)

segment.read_segment_objects(
file, self._prev_segment_objects, previous_segment)
file, self._prev_segment_objects, index_cache, previous_segment)
return segment

def _read_lead_in(self, file, segment_position, is_index_file=False):
Expand All @@ -237,12 +249,11 @@ def _read_lead_in(self, file, segment_position, is_index_file=False):
raise ValueError(
"Segment does not start with %r, but with %r" % (expected_tag, tag))

log.debug("Reading segment at %d", segment_position)

# Next four bytes are table of contents mask
toc_mask = types.Int32.read(file)

if log.isEnabledFor(logging.DEBUG):
log.debug("Reading segment at %d", segment_position)
for prop_name, prop_mask in toc_properties.items():
prop_is_set = (toc_mask & prop_mask) != 0
log.debug("Property %s is %s", prop_name, prop_is_set)
Expand Down Expand Up @@ -270,10 +281,8 @@ def _read_lead_in(self, file, segment_position, is_index_file=False):
next_segment_pos = self._get_data_file_size()
next_segment_offset = next_segment_pos - segment_position - lead_size
else:
log.debug("Next segment offset = %d, raw data offset = %d",
next_segment_offset, raw_data_offset)
log.debug("Data size = %d b",
next_segment_offset - raw_data_offset)
log.debug("Next segment offset = %d, raw data offset = %d, data size = %d b",
next_segment_offset, raw_data_offset, next_segment_offset - raw_data_offset)
next_segment_pos = (
segment_position + next_segment_offset + lead_size)

Expand Down Expand Up @@ -311,7 +320,8 @@ def _update_object_metadata(self, segment):
object_metadata = self._get_or_create_object(path)
object_metadata.num_values += _number_of_segment_values(segment_object, segment)
_update_object_data_type(path, object_metadata, segment_object)
_update_object_scaler_data_types(path, object_metadata, segment_object)
if segment_object.scaler_data_types is not None:
_update_object_scaler_data_types(path, object_metadata, segment_object)

def _update_object_properties(self, segment):
""" Update object properties using any properties in a segment
Expand All @@ -332,33 +342,41 @@ def _get_or_create_object(self, path):
self.object_metadata[path] = obj
return obj

def _build_index(self):
def _build_index(self, channel_path):
""" Builds an index into the segment data for faster lookup of values
_segment_channel_offsets provides data offset at the end of each segment per channel
_segment_chunk_sizes provides chunk sizes in each segment per channel
"""
data_objects = [
path
for (path, obj) in self.object_metadata.items()
if ObjectPath.from_string(path).is_channel]
num_segments = len(self._segments)

segment_num_values = {
path: np.zeros(num_segments, dtype=np.int64) for path in data_objects}
segment_chunk_sizes = {
path: np.zeros(num_segments, dtype=np.int64) for path in data_objects}
# Get number of values for this channel in each segment
segment_num_values = np.zeros(num_segments, dtype=np.int64)
first_segment = -1
last_segment = -1

for i, segment in enumerate(self._segments):
for obj in segment.ordered_objects:
if not obj.has_data:
continue
segment_chunk_sizes[obj.path][i] = obj.number_values if obj.has_data else 0
segment_num_values[obj.path][i] = _number_of_segment_values(obj, segment)

self._segment_chunk_sizes = segment_chunk_sizes
self._segment_channel_offsets = {
path: np.cumsum(segment_count) for (path, segment_count) in segment_num_values.items()}
obj_index = segment.object_index.get(channel_path)
if obj_index is not None:
segment_obj = segment.ordered_objects[obj_index]
num_values = _number_of_segment_values(segment_obj, segment)
if num_values > 0:
segment_num_values[i] = num_values
last_segment = i
if first_segment == -1:
first_segment = i

# Now use the cumulative sum to get the total channel value count
# at the end of each segment.
if first_segment == -1:
first_segment = num_segments
last_segment = num_segments
channel_offsets = np.cumsum(segment_num_values[first_segment:last_segment + 1])

# It's likely that many channels will have the same shaped data,
# so de-duplicate these arrays to reduce memory usage.
existing_arrays = (xs for (_, xs) in self._segment_channel_offsets.values())
channel_offsets = _deduplicate_array(channel_offsets, existing_arrays)
self._segment_channel_offsets[channel_path] = (first_segment, channel_offsets)

def _ensure_open(self):
if self._file is None:
Expand Down Expand Up @@ -394,13 +412,12 @@ def _update_object_data_type(path, obj, segment_object):
def _update_object_scaler_data_types(path, obj, segment_object):
""" Update the DAQmx scaler data types for an object using its segment metadata
"""
if segment_object.scaler_data_types is not None:
if obj.scaler_data_types is not None and obj.scaler_data_types != segment_object.scaler_data_types:
raise ValueError(
"Segment data doesn't have the same scaler data types as previous "
"segments for objects %s. Expected types %s but got %s" %
(path, obj.scaler_data_types, segment_object.scaler_data_types))
obj.scaler_data_types = segment_object.scaler_data_types
if obj.scaler_data_types is not None and obj.scaler_data_types != segment_object.scaler_data_types:
raise ValueError(
"Segment data doesn't have the same scaler data types as previous "
"segments for objects %s. Expected types %s but got %s" %
(path, obj.scaler_data_types, segment_object.scaler_data_types))
obj.scaler_data_types = segment_object.scaler_data_types


class ObjectMetadata(object):
Expand All @@ -425,3 +442,29 @@ def _trim_channel_chunk(chunk, skip=0, trim=0):
scale_id: d[skip:len(d) - trim]
for (scale_id, d) in chunk.scaler_data.items()}
return RawChannelDataChunk(data, scaler_data)


def _deduplicate_array(xs, candidates):
""" Reduce memory usage by replacing an array with a reference to an existing array if equal
"""
for candidate in candidates:
if _array_equal(xs, candidate):
return candidate
return xs


def _array_equal(a, b, chunk_size=100):
""" Compare two arrays for equality
"""
# Numpy array_equal compares all elements rather than comparing one at a time and short-circuiting when it
# finds a difference. Break up the comparison into chunks to make this faster. Adapted from:
# https://stackoverflow.com/questions/26260848/numpy-fast-check-for-complete-array-equality-like-matlabs-isequal
if len(a) != len(b):
return False

num_chunks = (len(a) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
offset = i * chunk_size
if not (a[offset:offset+chunk_size] == b[offset:offset+chunk_size]).all():
return False
return True
6 changes: 3 additions & 3 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, file, raw_timestamps=False, memmap_dir=None, read_metadata_on

self._reader = TdmsReader(file)
try:
self._read_file(self._reader, read_metadata_only)
self._read_file(self._reader, read_metadata_only, keep_open)
finally:
if not keep_open:
self._reader.close()
Expand Down Expand Up @@ -222,8 +222,8 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.close()

def _read_file(self, tdms_reader, read_metadata_only):
tdms_reader.read_metadata()
def _read_file(self, tdms_reader, read_metadata_only, keep_open):
tdms_reader.read_metadata(require_segment_indexes=keep_open)

# Use object metadata to build group and channel objects
group_properties = OrderedDict()
Expand Down

0 comments on commit 69710c3

Please sign in to comment.