Skip to content

Commit

Permalink
Improve handling of incomplete segments (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Mar 26, 2021
1 parent 79af886 commit 4d3f78f
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 60 deletions.
4 changes: 2 additions & 2 deletions nptdms/base_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class BaseDataReader(object):
""" Abstract base class for reading data in a segment
"""

def __init__(self, num_chunks, final_chunk_proportion, endianness):
def __init__(self, num_chunks, final_chunk_lengths_override, endianness):
self.num_chunks = num_chunks
self.final_chunk_proportion = final_chunk_proportion
self.final_chunk_lengths_override = final_chunk_lengths_override
self.endianness = endianness

def read_data_chunks(self, file, data_objects, num_chunks):
Expand Down
25 changes: 25 additions & 0 deletions nptdms/daqmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,31 @@ def get_daqmx_chunk_size(ordered_objects):
return sum((num_values * width) for (num_values, width) in get_buffer_dimensions(ordered_objects))


def get_daqmx_final_chunk_lengths(ordered_objects, chunk_size_bytes):
"""Compute object data lengths for a final chunk that has less data than expected
"""
object_lengths = {}
buffer_dims = get_buffer_dimensions(ordered_objects)
updated_buffer_lengths = [0] * len(buffer_dims)
bytes_remaining = chunk_size_bytes
for i, (orig_length, width) in enumerate(buffer_dims):
buffer_total_bytes = orig_length * width
if bytes_remaining > buffer_total_bytes:
updated_buffer_lengths[i] = orig_length
bytes_remaining -= buffer_total_bytes
else:
updated_buffer_lengths[i] = bytes_remaining // width
break
for obj in ordered_objects:
if not obj.has_data:
continue
buffer_indices = list(set(s.raw_buffer_index for s in obj.daqmx_metadata.scalers))
if len(buffer_indices) == 1:
object_lengths[obj.path] = updated_buffer_lengths[buffer_indices[0]]
# Else scalers are in different buffers, not sure this is even valid
return object_lengths


def get_buffer_dimensions(ordered_objects):
""" Returns DAQmx buffer dimensions as list of tuples of (number of values, width in bytes)
"""
Expand Down
19 changes: 9 additions & 10 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,14 @@ def read_channel_chunk_for_index(self, channel_path, index):

def _read_segment_metadata(
self, file, segment_position, index_cache, previous_segment, is_index_file):
(position, toc_mask, data_position, next_segment_pos) = self._read_lead_in(
(position, toc_mask, data_position, next_segment_pos, segment_incomplete) = self._read_lead_in(
file, segment_position, is_index_file)

segment = TdmsSegment(
position, toc_mask, next_segment_pos, data_position)

properties = segment.read_segment_objects(
file, self._prev_segment_objects, index_cache, previous_segment)
file, self._prev_segment_objects, index_cache, previous_segment, segment_incomplete)
return segment, properties

def _read_lead_in(self, file, segment_position, is_index_file=False):
Expand Down Expand Up @@ -273,7 +273,8 @@ def _read_lead_in(self, file, segment_position, is_index_file=False):
# Calculate data and next segment position
lead_size = 7 * 4
data_position = segment_position + lead_size + raw_data_offset
if next_segment_offset == 0xFFFFFFFFFFFFFFFF:
segment_incomplete = next_segment_offset == 0xFFFFFFFFFFFFFFFF
if segment_incomplete:
# Segment size is unknown. This can happen if LabVIEW crashes.
# Try to read until the end of the file.
log.warning(
Expand All @@ -286,7 +287,7 @@ def _read_lead_in(self, file, segment_position, is_index_file=False):
next_segment_pos = (
segment_position + next_segment_offset + lead_size)

return segment_position, toc_mask, data_position, next_segment_pos
return segment_position, toc_mask, data_position, next_segment_pos, segment_incomplete

def _verify_segment_start(self, segment):
""" When reading data for a segment, check for the TDSm tag at the start of the segment in an attempt
Expand Down Expand Up @@ -388,13 +389,11 @@ def _number_of_segment_values(segment_object, segment):
"""
if not segment_object.has_data:
return 0
num_chunks = segment.num_chunks
final_chunk_proportion = segment.final_chunk_proportion
if final_chunk_proportion == 1.0:
return segment_object.number_values * num_chunks
if segment.final_chunk_lengths_override is None:
return segment_object.number_values * segment.num_chunks
else:
return (segment_object.number_values * (num_chunks - 1) +
int(segment_object.number_values * final_chunk_proportion))
return (segment_object.number_values * (segment.num_chunks - 1) +
segment.final_chunk_lengths_override.get(segment_object.path, 0))


def _update_object_data_type(path, obj, segment_object):
Expand Down
68 changes: 51 additions & 17 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
DIGITAL_LINE_SCALER,
DaqmxSegmentObject,
DaqmxDataReader,
get_daqmx_chunk_size)
get_daqmx_chunk_size,
get_daqmx_final_chunk_lengths)


_struct_unpack = struct.unpack
Expand All @@ -41,7 +42,7 @@ class TdmsSegment(object):
'toc_mask',
'next_segment_pos',
'data_position',
'final_chunk_proportion',
'final_chunk_lengths_override',
'object_index',
]

Expand All @@ -51,25 +52,26 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position):
self.next_segment_pos = next_segment_pos
self.data_position = data_position
self.num_chunks = 0
self.final_chunk_proportion = 1.0
self.final_chunk_lengths_override = None
self.ordered_objects = None
self.object_index = None

def __repr__(self):
return "<TdmsSegment at position %d>" % self.position

def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment):
def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment, segment_incomplete):
"""Read segment metadata section and update object information
:param file: Open TDMS file
:param previous_segment_objects: Dictionary of path to the most
recently read segment object for a TDMS object.
:param index_cache: A SegmentIndexCache instance, or None if segment indexes are not required.
:param previous_segment: Previous segment in the file.
:param segment_incomplete: Whether the next segment offset was not set.
"""

if not self.toc_mask & toc_properties['kTocMetaData']:
self._reuse_previous_segment_metadata(previous_segment)
self._reuse_previous_segment_metadata(previous_segment, segment_incomplete)
return

endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
Expand Down Expand Up @@ -132,7 +134,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev

if index_cache is not None:
self.object_index = index_cache.get_index(self.ordered_objects)
self._calculate_chunks()
self._calculate_chunks(segment_incomplete)
return properties

def get_segment_object(self, object_path):
Expand Down Expand Up @@ -192,11 +194,11 @@ def _reuse_previous_object(
segment_obj.read_raw_data_index(file, raw_data_index_header, endianness)
self.ordered_objects.append(segment_obj)

def _reuse_previous_segment_metadata(self, previous_segment):
def _reuse_previous_segment_metadata(self, previous_segment, segment_incomplete):
try:
self.ordered_objects = previous_segment.ordered_objects
self.object_index = previous_segment.object_index
self._calculate_chunks()
self._calculate_chunks(segment_incomplete)
except AttributeError:
raise ValueError(
"kTocMetaData is not set for segment but "
Expand Down Expand Up @@ -267,7 +269,7 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
yield chunk

def _calculate_chunks(self):
def _calculate_chunks(self, segment_incomplete):
"""
Work out the number of chunks the data is in, for cases
where the meta data doesn't change at all so there is no
Expand Down Expand Up @@ -296,8 +298,41 @@ def _calculate_chunks(self):
"chunk size %d. Will attempt to read last chunk",
total_data_size, data_size)
self.num_chunks = 1 + int(total_data_size // data_size)
self.final_chunk_proportion = (
float(chunk_remainder) / float(data_size))
self.final_chunk_lengths_override = self._compute_final_chunk_lengths(
data_size, chunk_remainder, segment_incomplete)

def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_incomplete):
"""Compute object data lengths for a final chunk that has less data than expected
"""
if self._have_daqmx_objects():
return get_daqmx_final_chunk_lengths(self.ordered_objects, chunk_remainder)

obj_chunk_sizes = {}

if any(o for o in self.ordered_objects if o.has_data and o.data_type.size is None):
# Don't try to handle truncated segments with unsized data
return obj_chunk_sizes

interleaved_data = self.toc_mask & toc_properties['kTocInterleavedData']
if interleaved_data or not segment_incomplete:
for obj in self.ordered_objects:
if not obj.has_data:
continue
obj_chunk_sizes[obj.path] = (obj.number_values * chunk_remainder) // chunk_size
else:
# Have contiguous truncated data
for obj in self.ordered_objects:
if not obj.has_data:
continue
data_size = obj.number_values * obj.data_type.size
if chunk_remainder > data_size:
obj_chunk_sizes[obj.path] = obj.number_values
chunk_remainder -= data_size
else:
obj_chunk_sizes[obj.path] = chunk_remainder // obj.data_type.size
break

return obj_chunk_sizes

def _new_segment_object(self, object_path, raw_data_index_header):
""" Create a new segment object for a segment
Expand Down Expand Up @@ -335,11 +370,11 @@ def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offs
def _get_data_reader(self):
endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
if self._have_daqmx_objects():
return DaqmxDataReader(self.num_chunks, self.final_chunk_proportion, endianness)
return DaqmxDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness)
elif self.toc_mask & toc_properties['kTocInterleavedData']:
return InterleavedDataReader(self.num_chunks, self.final_chunk_proportion, endianness)
return InterleavedDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness)
else:
return ContiguousDataReader(self.num_chunks, self.final_chunk_proportion, endianness)
return ContiguousDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness)

def _have_daqmx_objects(self):
data_obj_count = 0
Expand Down Expand Up @@ -446,9 +481,8 @@ def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path
return channel_data

def _get_channel_number_values(self, obj, chunk_index):
if (chunk_index == (self.num_chunks - 1) and
self.final_chunk_proportion != 1.0):
return int(obj.number_values * self.final_chunk_proportion)
if chunk_index == (self.num_chunks - 1) and self.final_chunk_lengths_override is not None:
return self.final_chunk_lengths_override.get(obj.path, 0)
else:
return obj.number_values

Expand Down
20 changes: 15 additions & 5 deletions nptdms/test/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
segment_objects_metadata,
channel_metadata,
channel_metadata_with_no_data,
channel_metadata_with_repeated_structure)
channel_metadata_with_repeated_structure,
root_metadata,
group_metadata)


TDS_TYPE_INT8 = 1
Expand Down Expand Up @@ -538,24 +540,30 @@ def incomplete_last_segment():
test_file.add_segment(
("kTocMetaData", "kTocRawData", "kTocNewObjList"),
segment_objects_metadata(
root_metadata(),
group_metadata(),
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 2),
channel_metadata("/'group'/'channel2'", TDS_TYPE_INT32, 2),
channel_metadata("/'group'/'channel3'", TDS_TYPE_INT32, 2),
),
"01 00 00 00" "02 00 00 00"
"03 00 00 00" "04 00 00 00"
"05 00 00 00" "06 00 00 00"
"07 00 00 00" "08 00 00 00"
"09 00 00 00" "0A 00 00 00"
"0B 00 00 00" "0C 00 00 00"
)
test_file.add_segment(
("kTocRawData", ),
"",
"09 00 00 00" "0A 00 00 00"
"0B 00 00 00" "0C 00 00 00",
"01 00 00 00" "02 00 00 00"
"03 00 00 00",
incomplete=True
)
expected_data = {
('group', 'channel1'): np.array([1, 2, 5, 6, 9, 10], dtype=np.int32),
('group', 'channel2'): np.array([3, 4, 7, 8, 11, 12], dtype=np.int32),
('group', 'channel1'): np.array([1, 2, 7, 8, 1, 2], dtype=np.int32),
('group', 'channel2'): np.array([3, 4, 9, 10, 3], dtype=np.int32),
('group', 'channel3'): np.array([5, 6, 11, 12], dtype=np.int32),
}
return test_file, expected_data

Expand All @@ -568,6 +576,8 @@ def incomplete_last_row_of_interleaved_data():
test_file.add_segment(
("kTocMetaData", "kTocRawData", "kTocNewObjList", "kTocInterleavedData"),
segment_objects_metadata(
root_metadata(),
group_metadata(),
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 2),
channel_metadata("/'group'/'channel2'", TDS_TYPE_INT32, 2),
),
Expand Down

0 comments on commit 4d3f78f

Please sign in to comment.